##// END OF EJS Templates
bundle2: localize handleoutput remote prompts...
Akihiko Odaki -
r29851:ccd436f7 3.9.1 stable
parent child Browse files
Show More
@@ -1,1618 +1,1618
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 pushkey,
162 162 tags,
163 163 url,
164 164 util,
165 165 )
166 166
167 167 urlerr = util.urlerr
168 168 urlreq = util.urlreq
169 169
170 170 _pack = struct.pack
171 171 _unpack = struct.unpack
172 172
173 173 _fstreamparamsize = '>i'
174 174 _fpartheadersize = '>i'
175 175 _fparttypesize = '>B'
176 176 _fpartid = '>I'
177 177 _fpayloadsize = '>i'
178 178 _fpartparamcount = '>BB'
179 179
180 180 preferedchunksize = 4096
181 181
182 182 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
183 183
184 184 def outdebug(ui, message):
185 185 """debug regarding output stream (bundling)"""
186 186 if ui.configbool('devel', 'bundle2.debug', False):
187 187 ui.debug('bundle2-output: %s\n' % message)
188 188
189 189 def indebug(ui, message):
190 190 """debug on input stream (unbundling)"""
191 191 if ui.configbool('devel', 'bundle2.debug', False):
192 192 ui.debug('bundle2-input: %s\n' % message)
193 193
194 194 def validateparttype(parttype):
195 195 """raise ValueError if a parttype contains invalid character"""
196 196 if _parttypeforbidden.search(parttype):
197 197 raise ValueError(parttype)
198 198
199 199 def _makefpartparamsizes(nbparams):
200 200 """return a struct format to read part parameter sizes
201 201
202 202 The number parameters is variable so we need to build that format
203 203 dynamically.
204 204 """
205 205 return '>'+('BB'*nbparams)
206 206
207 207 parthandlermapping = {}
208 208
209 209 def parthandler(parttype, params=()):
210 210 """decorator that register a function as a bundle2 part handler
211 211
212 212 eg::
213 213
214 214 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
215 215 def myparttypehandler(...):
216 216 '''process a part of type "my part".'''
217 217 ...
218 218 """
219 219 validateparttype(parttype)
220 220 def _decorator(func):
221 221 lparttype = parttype.lower() # enforce lower case matching.
222 222 assert lparttype not in parthandlermapping
223 223 parthandlermapping[lparttype] = func
224 224 func.params = frozenset(params)
225 225 return func
226 226 return _decorator
227 227
228 228 class unbundlerecords(object):
229 229 """keep record of what happens during and unbundle
230 230
231 231 New records are added using `records.add('cat', obj)`. Where 'cat' is a
232 232 category of record and obj is an arbitrary object.
233 233
234 234 `records['cat']` will return all entries of this category 'cat'.
235 235
236 236 Iterating on the object itself will yield `('category', obj)` tuples
237 237 for all entries.
238 238
239 239 All iterations happens in chronological order.
240 240 """
241 241
242 242 def __init__(self):
243 243 self._categories = {}
244 244 self._sequences = []
245 245 self._replies = {}
246 246
247 247 def add(self, category, entry, inreplyto=None):
248 248 """add a new record of a given category.
249 249
250 250 The entry can then be retrieved in the list returned by
251 251 self['category']."""
252 252 self._categories.setdefault(category, []).append(entry)
253 253 self._sequences.append((category, entry))
254 254 if inreplyto is not None:
255 255 self.getreplies(inreplyto).add(category, entry)
256 256
257 257 def getreplies(self, partid):
258 258 """get the records that are replies to a specific part"""
259 259 return self._replies.setdefault(partid, unbundlerecords())
260 260
261 261 def __getitem__(self, cat):
262 262 return tuple(self._categories.get(cat, ()))
263 263
264 264 def __iter__(self):
265 265 return iter(self._sequences)
266 266
267 267 def __len__(self):
268 268 return len(self._sequences)
269 269
270 270 def __nonzero__(self):
271 271 return bool(self._sequences)
272 272
273 273 class bundleoperation(object):
274 274 """an object that represents a single bundling process
275 275
276 276 Its purpose is to carry unbundle-related objects and states.
277 277
278 278 A new object should be created at the beginning of each bundle processing.
279 279 The object is to be returned by the processing function.
280 280
281 281 The object has very little content now it will ultimately contain:
282 282 * an access to the repo the bundle is applied to,
283 283 * a ui object,
284 284 * a way to retrieve a transaction to add changes to the repo,
285 285 * a way to record the result of processing each part,
286 286 * a way to construct a bundle response when applicable.
287 287 """
288 288
289 289 def __init__(self, repo, transactiongetter, captureoutput=True):
290 290 self.repo = repo
291 291 self.ui = repo.ui
292 292 self.records = unbundlerecords()
293 293 self.gettransaction = transactiongetter
294 294 self.reply = None
295 295 self.captureoutput = captureoutput
296 296
297 297 class TransactionUnavailable(RuntimeError):
298 298 pass
299 299
300 300 def _notransaction():
301 301 """default method to get a transaction while processing a bundle
302 302
303 303 Raise an exception to highlight the fact that no transaction was expected
304 304 to be created"""
305 305 raise TransactionUnavailable()
306 306
307 307 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
308 308 # transform me into unbundler.apply() as soon as the freeze is lifted
309 309 tr.hookargs['bundle2'] = '1'
310 310 if source is not None and 'source' not in tr.hookargs:
311 311 tr.hookargs['source'] = source
312 312 if url is not None and 'url' not in tr.hookargs:
313 313 tr.hookargs['url'] = url
314 314 return processbundle(repo, unbundler, lambda: tr, op=op)
315 315
316 316 def processbundle(repo, unbundler, transactiongetter=None, op=None):
317 317 """This function process a bundle, apply effect to/from a repo
318 318
319 319 It iterates over each part then searches for and uses the proper handling
320 320 code to process the part. Parts are processed in order.
321 321
322 322 This is very early version of this function that will be strongly reworked
323 323 before final usage.
324 324
325 325 Unknown Mandatory part will abort the process.
326 326
327 327 It is temporarily possible to provide a prebuilt bundleoperation to the
328 328 function. This is used to ensure output is properly propagated in case of
329 329 an error during the unbundling. This output capturing part will likely be
330 330 reworked and this ability will probably go away in the process.
331 331 """
332 332 if op is None:
333 333 if transactiongetter is None:
334 334 transactiongetter = _notransaction
335 335 op = bundleoperation(repo, transactiongetter)
336 336 # todo:
337 337 # - replace this is a init function soon.
338 338 # - exception catching
339 339 unbundler.params
340 340 if repo.ui.debugflag:
341 341 msg = ['bundle2-input-bundle:']
342 342 if unbundler.params:
343 343 msg.append(' %i params')
344 344 if op.gettransaction is None:
345 345 msg.append(' no-transaction')
346 346 else:
347 347 msg.append(' with-transaction')
348 348 msg.append('\n')
349 349 repo.ui.debug(''.join(msg))
350 350 iterparts = enumerate(unbundler.iterparts())
351 351 part = None
352 352 nbpart = 0
353 353 try:
354 354 for nbpart, part in iterparts:
355 355 _processpart(op, part)
356 356 except Exception as exc:
357 357 for nbpart, part in iterparts:
358 358 # consume the bundle content
359 359 part.seek(0, 2)
360 360 # Small hack to let caller code distinguish exceptions from bundle2
361 361 # processing from processing the old format. This is mostly
362 362 # needed to handle different return codes to unbundle according to the
363 363 # type of bundle. We should probably clean up or drop this return code
364 364 # craziness in a future version.
365 365 exc.duringunbundle2 = True
366 366 salvaged = []
367 367 replycaps = None
368 368 if op.reply is not None:
369 369 salvaged = op.reply.salvageoutput()
370 370 replycaps = op.reply.capabilities
371 371 exc._replycaps = replycaps
372 372 exc._bundle2salvagedoutput = salvaged
373 373 raise
374 374 finally:
375 375 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
376 376
377 377 return op
378 378
379 379 def _processpart(op, part):
380 380 """process a single part from a bundle
381 381
382 382 The part is guaranteed to have been fully consumed when the function exits
383 383 (even if an exception is raised)."""
384 384 status = 'unknown' # used by debug output
385 385 hardabort = False
386 386 try:
387 387 try:
388 388 handler = parthandlermapping.get(part.type)
389 389 if handler is None:
390 390 status = 'unsupported-type'
391 391 raise error.BundleUnknownFeatureError(parttype=part.type)
392 392 indebug(op.ui, 'found a handler for part %r' % part.type)
393 393 unknownparams = part.mandatorykeys - handler.params
394 394 if unknownparams:
395 395 unknownparams = list(unknownparams)
396 396 unknownparams.sort()
397 397 status = 'unsupported-params (%s)' % unknownparams
398 398 raise error.BundleUnknownFeatureError(parttype=part.type,
399 399 params=unknownparams)
400 400 status = 'supported'
401 401 except error.BundleUnknownFeatureError as exc:
402 402 if part.mandatory: # mandatory parts
403 403 raise
404 404 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
405 405 return # skip to part processing
406 406 finally:
407 407 if op.ui.debugflag:
408 408 msg = ['bundle2-input-part: "%s"' % part.type]
409 409 if not part.mandatory:
410 410 msg.append(' (advisory)')
411 411 nbmp = len(part.mandatorykeys)
412 412 nbap = len(part.params) - nbmp
413 413 if nbmp or nbap:
414 414 msg.append(' (params:')
415 415 if nbmp:
416 416 msg.append(' %i mandatory' % nbmp)
417 417 if nbap:
418 418 msg.append(' %i advisory' % nbmp)
419 419 msg.append(')')
420 420 msg.append(' %s\n' % status)
421 421 op.ui.debug(''.join(msg))
422 422
423 423 # handler is called outside the above try block so that we don't
424 424 # risk catching KeyErrors from anything other than the
425 425 # parthandlermapping lookup (any KeyError raised by handler()
426 426 # itself represents a defect of a different variety).
427 427 output = None
428 428 if op.captureoutput and op.reply is not None:
429 429 op.ui.pushbuffer(error=True, subproc=True)
430 430 output = ''
431 431 try:
432 432 handler(op, part)
433 433 finally:
434 434 if output is not None:
435 435 output = op.ui.popbuffer()
436 436 if output:
437 437 outpart = op.reply.newpart('output', data=output,
438 438 mandatory=False)
439 439 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
440 440 # If exiting or interrupted, do not attempt to seek the stream in the
441 441 # finally block below. This makes abort faster.
442 442 except (SystemExit, KeyboardInterrupt):
443 443 hardabort = True
444 444 raise
445 445 finally:
446 446 # consume the part content to not corrupt the stream.
447 447 if not hardabort:
448 448 part.seek(0, 2)
449 449
450 450
451 451 def decodecaps(blob):
452 452 """decode a bundle2 caps bytes blob into a dictionary
453 453
454 454 The blob is a list of capabilities (one per line)
455 455 Capabilities may have values using a line of the form::
456 456
457 457 capability=value1,value2,value3
458 458
459 459 The values are always a list."""
460 460 caps = {}
461 461 for line in blob.splitlines():
462 462 if not line:
463 463 continue
464 464 if '=' not in line:
465 465 key, vals = line, ()
466 466 else:
467 467 key, vals = line.split('=', 1)
468 468 vals = vals.split(',')
469 469 key = urlreq.unquote(key)
470 470 vals = [urlreq.unquote(v) for v in vals]
471 471 caps[key] = vals
472 472 return caps
473 473
474 474 def encodecaps(caps):
475 475 """encode a bundle2 caps dictionary into a bytes blob"""
476 476 chunks = []
477 477 for ca in sorted(caps):
478 478 vals = caps[ca]
479 479 ca = urlreq.quote(ca)
480 480 vals = [urlreq.quote(v) for v in vals]
481 481 if vals:
482 482 ca = "%s=%s" % (ca, ','.join(vals))
483 483 chunks.append(ca)
484 484 return '\n'.join(chunks)
485 485
486 486 bundletypes = {
487 487 "": ("", None), # only when using unbundle on ssh and old http servers
488 488 # since the unification ssh accepts a header but there
489 489 # is no capability signaling it.
490 490 "HG20": (), # special-cased below
491 491 "HG10UN": ("HG10UN", None),
492 492 "HG10BZ": ("HG10", 'BZ'),
493 493 "HG10GZ": ("HG10GZ", 'GZ'),
494 494 }
495 495
496 496 # hgweb uses this list to communicate its preferred type
497 497 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
498 498
499 499 class bundle20(object):
500 500 """represent an outgoing bundle2 container
501 501
502 502 Use the `addparam` method to add stream level parameter. and `newpart` to
503 503 populate it. Then call `getchunks` to retrieve all the binary chunks of
504 504 data that compose the bundle2 container."""
505 505
506 506 _magicstring = 'HG20'
507 507
508 508 def __init__(self, ui, capabilities=()):
509 509 self.ui = ui
510 510 self._params = []
511 511 self._parts = []
512 512 self.capabilities = dict(capabilities)
513 513 self._compressor = util.compressors[None]()
514 514
515 515 def setcompression(self, alg):
516 516 """setup core part compression to <alg>"""
517 517 if alg is None:
518 518 return
519 519 assert not any(n.lower() == 'Compression' for n, v in self._params)
520 520 self.addparam('Compression', alg)
521 521 self._compressor = util.compressors[alg]()
522 522
523 523 @property
524 524 def nbparts(self):
525 525 """total number of parts added to the bundler"""
526 526 return len(self._parts)
527 527
528 528 # methods used to defines the bundle2 content
529 529 def addparam(self, name, value=None):
530 530 """add a stream level parameter"""
531 531 if not name:
532 532 raise ValueError('empty parameter name')
533 533 if name[0] not in string.letters:
534 534 raise ValueError('non letter first character: %r' % name)
535 535 self._params.append((name, value))
536 536
537 537 def addpart(self, part):
538 538 """add a new part to the bundle2 container
539 539
540 540 Parts contains the actual applicative payload."""
541 541 assert part.id is None
542 542 part.id = len(self._parts) # very cheap counter
543 543 self._parts.append(part)
544 544
545 545 def newpart(self, typeid, *args, **kwargs):
546 546 """create a new part and add it to the containers
547 547
548 548 As the part is directly added to the containers. For now, this means
549 549 that any failure to properly initialize the part after calling
550 550 ``newpart`` should result in a failure of the whole bundling process.
551 551
552 552 You can still fall back to manually create and add if you need better
553 553 control."""
554 554 part = bundlepart(typeid, *args, **kwargs)
555 555 self.addpart(part)
556 556 return part
557 557
558 558 # methods used to generate the bundle2 stream
559 559 def getchunks(self):
560 560 if self.ui.debugflag:
561 561 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
562 562 if self._params:
563 563 msg.append(' (%i params)' % len(self._params))
564 564 msg.append(' %i parts total\n' % len(self._parts))
565 565 self.ui.debug(''.join(msg))
566 566 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
567 567 yield self._magicstring
568 568 param = self._paramchunk()
569 569 outdebug(self.ui, 'bundle parameter: %s' % param)
570 570 yield _pack(_fstreamparamsize, len(param))
571 571 if param:
572 572 yield param
573 573 # starting compression
574 574 for chunk in self._getcorechunk():
575 575 yield self._compressor.compress(chunk)
576 576 yield self._compressor.flush()
577 577
578 578 def _paramchunk(self):
579 579 """return a encoded version of all stream parameters"""
580 580 blocks = []
581 581 for par, value in self._params:
582 582 par = urlreq.quote(par)
583 583 if value is not None:
584 584 value = urlreq.quote(value)
585 585 par = '%s=%s' % (par, value)
586 586 blocks.append(par)
587 587 return ' '.join(blocks)
588 588
589 589 def _getcorechunk(self):
590 590 """yield chunk for the core part of the bundle
591 591
592 592 (all but headers and parameters)"""
593 593 outdebug(self.ui, 'start of parts')
594 594 for part in self._parts:
595 595 outdebug(self.ui, 'bundle part: "%s"' % part.type)
596 596 for chunk in part.getchunks(ui=self.ui):
597 597 yield chunk
598 598 outdebug(self.ui, 'end of bundle')
599 599 yield _pack(_fpartheadersize, 0)
600 600
601 601
602 602 def salvageoutput(self):
603 603 """return a list with a copy of all output parts in the bundle
604 604
605 605 This is meant to be used during error handling to make sure we preserve
606 606 server output"""
607 607 salvaged = []
608 608 for part in self._parts:
609 609 if part.type.startswith('output'):
610 610 salvaged.append(part.copy())
611 611 return salvaged
612 612
613 613
614 614 class unpackermixin(object):
615 615 """A mixin to extract bytes and struct data from a stream"""
616 616
617 617 def __init__(self, fp):
618 618 self._fp = fp
619 619 self._seekable = (util.safehasattr(fp, 'seek') and
620 620 util.safehasattr(fp, 'tell'))
621 621
622 622 def _unpack(self, format):
623 623 """unpack this struct format from the stream"""
624 624 data = self._readexact(struct.calcsize(format))
625 625 return _unpack(format, data)
626 626
627 627 def _readexact(self, size):
628 628 """read exactly <size> bytes from the stream"""
629 629 return changegroup.readexactly(self._fp, size)
630 630
631 631 def seek(self, offset, whence=0):
632 632 """move the underlying file pointer"""
633 633 if self._seekable:
634 634 return self._fp.seek(offset, whence)
635 635 else:
636 636 raise NotImplementedError(_('File pointer is not seekable'))
637 637
638 638 def tell(self):
639 639 """return the file offset, or None if file is not seekable"""
640 640 if self._seekable:
641 641 try:
642 642 return self._fp.tell()
643 643 except IOError as e:
644 644 if e.errno == errno.ESPIPE:
645 645 self._seekable = False
646 646 else:
647 647 raise
648 648 return None
649 649
650 650 def close(self):
651 651 """close underlying file"""
652 652 if util.safehasattr(self._fp, 'close'):
653 653 return self._fp.close()
654 654
655 655 def getunbundler(ui, fp, magicstring=None):
656 656 """return a valid unbundler object for a given magicstring"""
657 657 if magicstring is None:
658 658 magicstring = changegroup.readexactly(fp, 4)
659 659 magic, version = magicstring[0:2], magicstring[2:4]
660 660 if magic != 'HG':
661 661 raise error.Abort(_('not a Mercurial bundle'))
662 662 unbundlerclass = formatmap.get(version)
663 663 if unbundlerclass is None:
664 664 raise error.Abort(_('unknown bundle version %s') % version)
665 665 unbundler = unbundlerclass(ui, fp)
666 666 indebug(ui, 'start processing of %s stream' % magicstring)
667 667 return unbundler
668 668
669 669 class unbundle20(unpackermixin):
670 670 """interpret a bundle2 stream
671 671
672 672 This class is fed with a binary stream and yields parts through its
673 673 `iterparts` methods."""
674 674
675 675 _magicstring = 'HG20'
676 676
677 677 def __init__(self, ui, fp):
678 678 """If header is specified, we do not read it out of the stream."""
679 679 self.ui = ui
680 680 self._decompressor = util.decompressors[None]
681 681 self._compressed = None
682 682 super(unbundle20, self).__init__(fp)
683 683
684 684 @util.propertycache
685 685 def params(self):
686 686 """dictionary of stream level parameters"""
687 687 indebug(self.ui, 'reading bundle2 stream parameters')
688 688 params = {}
689 689 paramssize = self._unpack(_fstreamparamsize)[0]
690 690 if paramssize < 0:
691 691 raise error.BundleValueError('negative bundle param size: %i'
692 692 % paramssize)
693 693 if paramssize:
694 694 params = self._readexact(paramssize)
695 695 params = self._processallparams(params)
696 696 return params
697 697
698 698 def _processallparams(self, paramsblock):
699 699 """"""
700 700 params = util.sortdict()
701 701 for p in paramsblock.split(' '):
702 702 p = p.split('=', 1)
703 703 p = [urlreq.unquote(i) for i in p]
704 704 if len(p) < 2:
705 705 p.append(None)
706 706 self._processparam(*p)
707 707 params[p[0]] = p[1]
708 708 return params
709 709
710 710
711 711 def _processparam(self, name, value):
712 712 """process a parameter, applying its effect if needed
713 713
714 714 Parameter starting with a lower case letter are advisory and will be
715 715 ignored when unknown. Those starting with an upper case letter are
716 716 mandatory and will this function will raise a KeyError when unknown.
717 717
718 718 Note: no option are currently supported. Any input will be either
719 719 ignored or failing.
720 720 """
721 721 if not name:
722 722 raise ValueError('empty parameter name')
723 723 if name[0] not in string.letters:
724 724 raise ValueError('non letter first character: %r' % name)
725 725 try:
726 726 handler = b2streamparamsmap[name.lower()]
727 727 except KeyError:
728 728 if name[0].islower():
729 729 indebug(self.ui, "ignoring unknown parameter %r" % name)
730 730 else:
731 731 raise error.BundleUnknownFeatureError(params=(name,))
732 732 else:
733 733 handler(self, name, value)
734 734
735 735 def _forwardchunks(self):
736 736 """utility to transfer a bundle2 as binary
737 737
738 738 This is made necessary by the fact the 'getbundle' command over 'ssh'
739 739 have no way to know then the reply end, relying on the bundle to be
740 740 interpreted to know its end. This is terrible and we are sorry, but we
741 741 needed to move forward to get general delta enabled.
742 742 """
743 743 yield self._magicstring
744 744 assert 'params' not in vars(self)
745 745 paramssize = self._unpack(_fstreamparamsize)[0]
746 746 if paramssize < 0:
747 747 raise error.BundleValueError('negative bundle param size: %i'
748 748 % paramssize)
749 749 yield _pack(_fstreamparamsize, paramssize)
750 750 if paramssize:
751 751 params = self._readexact(paramssize)
752 752 self._processallparams(params)
753 753 yield params
754 754 assert self._decompressor is util.decompressors[None]
755 755 # From there, payload might need to be decompressed
756 756 self._fp = self._decompressor(self._fp)
757 757 emptycount = 0
758 758 while emptycount < 2:
759 759 # so we can brainlessly loop
760 760 assert _fpartheadersize == _fpayloadsize
761 761 size = self._unpack(_fpartheadersize)[0]
762 762 yield _pack(_fpartheadersize, size)
763 763 if size:
764 764 emptycount = 0
765 765 else:
766 766 emptycount += 1
767 767 continue
768 768 if size == flaginterrupt:
769 769 continue
770 770 elif size < 0:
771 771 raise error.BundleValueError('negative chunk size: %i')
772 772 yield self._readexact(size)
773 773
774 774
775 775 def iterparts(self):
776 776 """yield all parts contained in the stream"""
777 777 # make sure param have been loaded
778 778 self.params
779 779 # From there, payload need to be decompressed
780 780 self._fp = self._decompressor(self._fp)
781 781 indebug(self.ui, 'start extraction of bundle2 parts')
782 782 headerblock = self._readpartheader()
783 783 while headerblock is not None:
784 784 part = unbundlepart(self.ui, headerblock, self._fp)
785 785 yield part
786 786 part.seek(0, 2)
787 787 headerblock = self._readpartheader()
788 788 indebug(self.ui, 'end of bundle2 stream')
789 789
790 790 def _readpartheader(self):
791 791 """reads a part header size and return the bytes blob
792 792
793 793 returns None if empty"""
794 794 headersize = self._unpack(_fpartheadersize)[0]
795 795 if headersize < 0:
796 796 raise error.BundleValueError('negative part header size: %i'
797 797 % headersize)
798 798 indebug(self.ui, 'part header size: %i' % headersize)
799 799 if headersize:
800 800 return self._readexact(headersize)
801 801 return None
802 802
803 803 def compressed(self):
804 804 self.params # load params
805 805 return self._compressed
806 806
807 807 formatmap = {'20': unbundle20}
808 808
809 809 b2streamparamsmap = {}
810 810
811 811 def b2streamparamhandler(name):
812 812 """register a handler for a stream level parameter"""
813 813 def decorator(func):
814 814 assert name not in formatmap
815 815 b2streamparamsmap[name] = func
816 816 return func
817 817 return decorator
818 818
819 819 @b2streamparamhandler('compression')
820 820 def processcompression(unbundler, param, value):
821 821 """read compression parameter and install payload decompression"""
822 822 if value not in util.decompressors:
823 823 raise error.BundleUnknownFeatureError(params=(param,),
824 824 values=(value,))
825 825 unbundler._decompressor = util.decompressors[value]
826 826 if value is not None:
827 827 unbundler._compressed = True
828 828
829 829 class bundlepart(object):
830 830 """A bundle2 part contains application level payload
831 831
832 832 The part `type` is used to route the part to the application level
833 833 handler.
834 834
835 835 The part payload is contained in ``part.data``. It could be raw bytes or a
836 836 generator of byte chunks.
837 837
838 838 You can add parameters to the part using the ``addparam`` method.
839 839 Parameters can be either mandatory (default) or advisory. Remote side
840 840 should be able to safely ignore the advisory ones.
841 841
842 842 Both data and parameters cannot be modified after the generation has begun.
843 843 """
844 844
845 845 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
846 846 data='', mandatory=True):
847 847 validateparttype(parttype)
848 848 self.id = None
849 849 self.type = parttype
850 850 self._data = data
851 851 self._mandatoryparams = list(mandatoryparams)
852 852 self._advisoryparams = list(advisoryparams)
853 853 # checking for duplicated entries
854 854 self._seenparams = set()
855 855 for pname, __ in self._mandatoryparams + self._advisoryparams:
856 856 if pname in self._seenparams:
857 857 raise RuntimeError('duplicated params: %s' % pname)
858 858 self._seenparams.add(pname)
859 859 # status of the part's generation:
860 860 # - None: not started,
861 861 # - False: currently generated,
862 862 # - True: generation done.
863 863 self._generated = None
864 864 self.mandatory = mandatory
865 865
866 866 def copy(self):
867 867 """return a copy of the part
868 868
869 869 The new part have the very same content but no partid assigned yet.
870 870 Parts with generated data cannot be copied."""
871 871 assert not util.safehasattr(self.data, 'next')
872 872 return self.__class__(self.type, self._mandatoryparams,
873 873 self._advisoryparams, self._data, self.mandatory)
874 874
875 875 # methods used to defines the part content
876 876 @property
877 877 def data(self):
878 878 return self._data
879 879
880 880 @data.setter
881 881 def data(self, data):
882 882 if self._generated is not None:
883 883 raise error.ReadOnlyPartError('part is being generated')
884 884 self._data = data
885 885
886 886 @property
887 887 def mandatoryparams(self):
888 888 # make it an immutable tuple to force people through ``addparam``
889 889 return tuple(self._mandatoryparams)
890 890
891 891 @property
892 892 def advisoryparams(self):
893 893 # make it an immutable tuple to force people through ``addparam``
894 894 return tuple(self._advisoryparams)
895 895
896 896 def addparam(self, name, value='', mandatory=True):
897 897 if self._generated is not None:
898 898 raise error.ReadOnlyPartError('part is being generated')
899 899 if name in self._seenparams:
900 900 raise ValueError('duplicated params: %s' % name)
901 901 self._seenparams.add(name)
902 902 params = self._advisoryparams
903 903 if mandatory:
904 904 params = self._mandatoryparams
905 905 params.append((name, value))
906 906
907 907 # methods used to generates the bundle2 stream
908 908 def getchunks(self, ui):
909 909 if self._generated is not None:
910 910 raise RuntimeError('part can only be consumed once')
911 911 self._generated = False
912 912
913 913 if ui.debugflag:
914 914 msg = ['bundle2-output-part: "%s"' % self.type]
915 915 if not self.mandatory:
916 916 msg.append(' (advisory)')
917 917 nbmp = len(self.mandatoryparams)
918 918 nbap = len(self.advisoryparams)
919 919 if nbmp or nbap:
920 920 msg.append(' (params:')
921 921 if nbmp:
922 922 msg.append(' %i mandatory' % nbmp)
923 923 if nbap:
924 924 msg.append(' %i advisory' % nbmp)
925 925 msg.append(')')
926 926 if not self.data:
927 927 msg.append(' empty payload')
928 928 elif util.safehasattr(self.data, 'next'):
929 929 msg.append(' streamed payload')
930 930 else:
931 931 msg.append(' %i bytes payload' % len(self.data))
932 932 msg.append('\n')
933 933 ui.debug(''.join(msg))
934 934
935 935 #### header
936 936 if self.mandatory:
937 937 parttype = self.type.upper()
938 938 else:
939 939 parttype = self.type.lower()
940 940 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
941 941 ## parttype
942 942 header = [_pack(_fparttypesize, len(parttype)),
943 943 parttype, _pack(_fpartid, self.id),
944 944 ]
945 945 ## parameters
946 946 # count
947 947 manpar = self.mandatoryparams
948 948 advpar = self.advisoryparams
949 949 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
950 950 # size
951 951 parsizes = []
952 952 for key, value in manpar:
953 953 parsizes.append(len(key))
954 954 parsizes.append(len(value))
955 955 for key, value in advpar:
956 956 parsizes.append(len(key))
957 957 parsizes.append(len(value))
958 958 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
959 959 header.append(paramsizes)
960 960 # key, value
961 961 for key, value in manpar:
962 962 header.append(key)
963 963 header.append(value)
964 964 for key, value in advpar:
965 965 header.append(key)
966 966 header.append(value)
967 967 ## finalize header
968 968 headerchunk = ''.join(header)
969 969 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
970 970 yield _pack(_fpartheadersize, len(headerchunk))
971 971 yield headerchunk
972 972 ## payload
973 973 try:
974 974 for chunk in self._payloadchunks():
975 975 outdebug(ui, 'payload chunk size: %i' % len(chunk))
976 976 yield _pack(_fpayloadsize, len(chunk))
977 977 yield chunk
978 978 except GeneratorExit:
979 979 # GeneratorExit means that nobody is listening for our
980 980 # results anyway, so just bail quickly rather than trying
981 981 # to produce an error part.
982 982 ui.debug('bundle2-generatorexit\n')
983 983 raise
984 984 except BaseException as exc:
985 985 # backup exception data for later
986 986 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
987 987 % exc)
988 988 exc_info = sys.exc_info()
989 989 msg = 'unexpected error: %s' % exc
990 990 interpart = bundlepart('error:abort', [('message', msg)],
991 991 mandatory=False)
992 992 interpart.id = 0
993 993 yield _pack(_fpayloadsize, -1)
994 994 for chunk in interpart.getchunks(ui=ui):
995 995 yield chunk
996 996 outdebug(ui, 'closing payload chunk')
997 997 # abort current part payload
998 998 yield _pack(_fpayloadsize, 0)
999 999 raise exc_info[0], exc_info[1], exc_info[2]
1000 1000 # end of payload
1001 1001 outdebug(ui, 'closing payload chunk')
1002 1002 yield _pack(_fpayloadsize, 0)
1003 1003 self._generated = True
1004 1004
1005 1005 def _payloadchunks(self):
1006 1006 """yield chunks of a the part payload
1007 1007
1008 1008 Exists to handle the different methods to provide data to a part."""
1009 1009 # we only support fixed size data now.
1010 1010 # This will be improved in the future.
1011 1011 if util.safehasattr(self.data, 'next'):
1012 1012 buff = util.chunkbuffer(self.data)
1013 1013 chunk = buff.read(preferedchunksize)
1014 1014 while chunk:
1015 1015 yield chunk
1016 1016 chunk = buff.read(preferedchunksize)
1017 1017 elif len(self.data):
1018 1018 yield self.data
1019 1019
1020 1020
1021 1021 flaginterrupt = -1
1022 1022
1023 1023 class interrupthandler(unpackermixin):
1024 1024 """read one part and process it with restricted capability
1025 1025
1026 1026 This allows to transmit exception raised on the producer size during part
1027 1027 iteration while the consumer is reading a part.
1028 1028
1029 1029 Part processed in this manner only have access to a ui object,"""
1030 1030
1031 1031 def __init__(self, ui, fp):
1032 1032 super(interrupthandler, self).__init__(fp)
1033 1033 self.ui = ui
1034 1034
1035 1035 def _readpartheader(self):
1036 1036 """reads a part header size and return the bytes blob
1037 1037
1038 1038 returns None if empty"""
1039 1039 headersize = self._unpack(_fpartheadersize)[0]
1040 1040 if headersize < 0:
1041 1041 raise error.BundleValueError('negative part header size: %i'
1042 1042 % headersize)
1043 1043 indebug(self.ui, 'part header size: %i\n' % headersize)
1044 1044 if headersize:
1045 1045 return self._readexact(headersize)
1046 1046 return None
1047 1047
1048 1048 def __call__(self):
1049 1049
1050 1050 self.ui.debug('bundle2-input-stream-interrupt:'
1051 1051 ' opening out of band context\n')
1052 1052 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1053 1053 headerblock = self._readpartheader()
1054 1054 if headerblock is None:
1055 1055 indebug(self.ui, 'no part found during interruption.')
1056 1056 return
1057 1057 part = unbundlepart(self.ui, headerblock, self._fp)
1058 1058 op = interruptoperation(self.ui)
1059 1059 _processpart(op, part)
1060 1060 self.ui.debug('bundle2-input-stream-interrupt:'
1061 1061 ' closing out of band context\n')
1062 1062
1063 1063 class interruptoperation(object):
1064 1064 """A limited operation to be use by part handler during interruption
1065 1065
1066 1066 It only have access to an ui object.
1067 1067 """
1068 1068
1069 1069 def __init__(self, ui):
1070 1070 self.ui = ui
1071 1071 self.reply = None
1072 1072 self.captureoutput = False
1073 1073
1074 1074 @property
1075 1075 def repo(self):
1076 1076 raise RuntimeError('no repo access from stream interruption')
1077 1077
1078 1078 def gettransaction(self):
1079 1079 raise TransactionUnavailable('no repo access from stream interruption')
1080 1080
1081 1081 class unbundlepart(unpackermixin):
1082 1082 """a bundle part read from a bundle"""
1083 1083
1084 1084 def __init__(self, ui, header, fp):
1085 1085 super(unbundlepart, self).__init__(fp)
1086 1086 self.ui = ui
1087 1087 # unbundle state attr
1088 1088 self._headerdata = header
1089 1089 self._headeroffset = 0
1090 1090 self._initialized = False
1091 1091 self.consumed = False
1092 1092 # part data
1093 1093 self.id = None
1094 1094 self.type = None
1095 1095 self.mandatoryparams = None
1096 1096 self.advisoryparams = None
1097 1097 self.params = None
1098 1098 self.mandatorykeys = ()
1099 1099 self._payloadstream = None
1100 1100 self._readheader()
1101 1101 self._mandatory = None
1102 1102 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1103 1103 self._pos = 0
1104 1104
1105 1105 def _fromheader(self, size):
1106 1106 """return the next <size> byte from the header"""
1107 1107 offset = self._headeroffset
1108 1108 data = self._headerdata[offset:(offset + size)]
1109 1109 self._headeroffset = offset + size
1110 1110 return data
1111 1111
1112 1112 def _unpackheader(self, format):
1113 1113 """read given format from header
1114 1114
1115 1115 This automatically compute the size of the format to read."""
1116 1116 data = self._fromheader(struct.calcsize(format))
1117 1117 return _unpack(format, data)
1118 1118
1119 1119 def _initparams(self, mandatoryparams, advisoryparams):
1120 1120 """internal function to setup all logic related parameters"""
1121 1121 # make it read only to prevent people touching it by mistake.
1122 1122 self.mandatoryparams = tuple(mandatoryparams)
1123 1123 self.advisoryparams = tuple(advisoryparams)
1124 1124 # user friendly UI
1125 1125 self.params = util.sortdict(self.mandatoryparams)
1126 1126 self.params.update(self.advisoryparams)
1127 1127 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1128 1128
1129 1129 def _payloadchunks(self, chunknum=0):
1130 1130 '''seek to specified chunk and start yielding data'''
1131 1131 if len(self._chunkindex) == 0:
1132 1132 assert chunknum == 0, 'Must start with chunk 0'
1133 1133 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1134 1134 else:
1135 1135 assert chunknum < len(self._chunkindex), \
1136 1136 'Unknown chunk %d' % chunknum
1137 1137 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1138 1138
1139 1139 pos = self._chunkindex[chunknum][0]
1140 1140 payloadsize = self._unpack(_fpayloadsize)[0]
1141 1141 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1142 1142 while payloadsize:
1143 1143 if payloadsize == flaginterrupt:
1144 1144 # interruption detection, the handler will now read a
1145 1145 # single part and process it.
1146 1146 interrupthandler(self.ui, self._fp)()
1147 1147 elif payloadsize < 0:
1148 1148 msg = 'negative payload chunk size: %i' % payloadsize
1149 1149 raise error.BundleValueError(msg)
1150 1150 else:
1151 1151 result = self._readexact(payloadsize)
1152 1152 chunknum += 1
1153 1153 pos += payloadsize
1154 1154 if chunknum == len(self._chunkindex):
1155 1155 self._chunkindex.append((pos,
1156 1156 super(unbundlepart, self).tell()))
1157 1157 yield result
1158 1158 payloadsize = self._unpack(_fpayloadsize)[0]
1159 1159 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1160 1160
1161 1161 def _findchunk(self, pos):
1162 1162 '''for a given payload position, return a chunk number and offset'''
1163 1163 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1164 1164 if ppos == pos:
1165 1165 return chunk, 0
1166 1166 elif ppos > pos:
1167 1167 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1168 1168 raise ValueError('Unknown chunk')
1169 1169
1170 1170 def _readheader(self):
1171 1171 """read the header and setup the object"""
1172 1172 typesize = self._unpackheader(_fparttypesize)[0]
1173 1173 self.type = self._fromheader(typesize)
1174 1174 indebug(self.ui, 'part type: "%s"' % self.type)
1175 1175 self.id = self._unpackheader(_fpartid)[0]
1176 1176 indebug(self.ui, 'part id: "%s"' % self.id)
1177 1177 # extract mandatory bit from type
1178 1178 self.mandatory = (self.type != self.type.lower())
1179 1179 self.type = self.type.lower()
1180 1180 ## reading parameters
1181 1181 # param count
1182 1182 mancount, advcount = self._unpackheader(_fpartparamcount)
1183 1183 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1184 1184 # param size
1185 1185 fparamsizes = _makefpartparamsizes(mancount + advcount)
1186 1186 paramsizes = self._unpackheader(fparamsizes)
1187 1187 # make it a list of couple again
1188 1188 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1189 1189 # split mandatory from advisory
1190 1190 mansizes = paramsizes[:mancount]
1191 1191 advsizes = paramsizes[mancount:]
1192 1192 # retrieve param value
1193 1193 manparams = []
1194 1194 for key, value in mansizes:
1195 1195 manparams.append((self._fromheader(key), self._fromheader(value)))
1196 1196 advparams = []
1197 1197 for key, value in advsizes:
1198 1198 advparams.append((self._fromheader(key), self._fromheader(value)))
1199 1199 self._initparams(manparams, advparams)
1200 1200 ## part payload
1201 1201 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1202 1202 # we read the data, tell it
1203 1203 self._initialized = True
1204 1204
1205 1205 def read(self, size=None):
1206 1206 """read payload data"""
1207 1207 if not self._initialized:
1208 1208 self._readheader()
1209 1209 if size is None:
1210 1210 data = self._payloadstream.read()
1211 1211 else:
1212 1212 data = self._payloadstream.read(size)
1213 1213 self._pos += len(data)
1214 1214 if size is None or len(data) < size:
1215 1215 if not self.consumed and self._pos:
1216 1216 self.ui.debug('bundle2-input-part: total payload size %i\n'
1217 1217 % self._pos)
1218 1218 self.consumed = True
1219 1219 return data
1220 1220
1221 1221 def tell(self):
1222 1222 return self._pos
1223 1223
1224 1224 def seek(self, offset, whence=0):
1225 1225 if whence == 0:
1226 1226 newpos = offset
1227 1227 elif whence == 1:
1228 1228 newpos = self._pos + offset
1229 1229 elif whence == 2:
1230 1230 if not self.consumed:
1231 1231 self.read()
1232 1232 newpos = self._chunkindex[-1][0] - offset
1233 1233 else:
1234 1234 raise ValueError('Unknown whence value: %r' % (whence,))
1235 1235
1236 1236 if newpos > self._chunkindex[-1][0] and not self.consumed:
1237 1237 self.read()
1238 1238 if not 0 <= newpos <= self._chunkindex[-1][0]:
1239 1239 raise ValueError('Offset out of range')
1240 1240
1241 1241 if self._pos != newpos:
1242 1242 chunk, internaloffset = self._findchunk(newpos)
1243 1243 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1244 1244 adjust = self.read(internaloffset)
1245 1245 if len(adjust) != internaloffset:
1246 1246 raise error.Abort(_('Seek failed\n'))
1247 1247 self._pos = newpos
1248 1248
1249 1249 # These are only the static capabilities.
1250 1250 # Check the 'getrepocaps' function for the rest.
1251 1251 capabilities = {'HG20': (),
1252 1252 'error': ('abort', 'unsupportedcontent', 'pushraced',
1253 1253 'pushkey'),
1254 1254 'listkeys': (),
1255 1255 'pushkey': (),
1256 1256 'digests': tuple(sorted(util.DIGESTS.keys())),
1257 1257 'remote-changegroup': ('http', 'https'),
1258 1258 'hgtagsfnodes': (),
1259 1259 }
1260 1260
1261 1261 def getrepocaps(repo, allowpushback=False):
1262 1262 """return the bundle2 capabilities for a given repo
1263 1263
1264 1264 Exists to allow extensions (like evolution) to mutate the capabilities.
1265 1265 """
1266 1266 caps = capabilities.copy()
1267 1267 caps['changegroup'] = tuple(sorted(
1268 1268 changegroup.supportedincomingversions(repo)))
1269 1269 if obsolete.isenabled(repo, obsolete.exchangeopt):
1270 1270 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1271 1271 caps['obsmarkers'] = supportedformat
1272 1272 if allowpushback:
1273 1273 caps['pushback'] = ()
1274 1274 return caps
1275 1275
1276 1276 def bundle2caps(remote):
1277 1277 """return the bundle capabilities of a peer as dict"""
1278 1278 raw = remote.capable('bundle2')
1279 1279 if not raw and raw != '':
1280 1280 return {}
1281 1281 capsblob = urlreq.unquote(remote.capable('bundle2'))
1282 1282 return decodecaps(capsblob)
1283 1283
1284 1284 def obsmarkersversion(caps):
1285 1285 """extract the list of supported obsmarkers versions from a bundle2caps dict
1286 1286 """
1287 1287 obscaps = caps.get('obsmarkers', ())
1288 1288 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1289 1289
1290 1290 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
1291 1291 """Write a bundle file and return its filename.
1292 1292
1293 1293 Existing files will not be overwritten.
1294 1294 If no filename is specified, a temporary file is created.
1295 1295 bz2 compression can be turned off.
1296 1296 The bundle file will be deleted in case of errors.
1297 1297 """
1298 1298
1299 1299 if bundletype == "HG20":
1300 1300 bundle = bundle20(ui)
1301 1301 bundle.setcompression(compression)
1302 1302 part = bundle.newpart('changegroup', data=cg.getchunks())
1303 1303 part.addparam('version', cg.version)
1304 1304 if 'clcount' in cg.extras:
1305 1305 part.addparam('nbchanges', str(cg.extras['clcount']),
1306 1306 mandatory=False)
1307 1307 chunkiter = bundle.getchunks()
1308 1308 else:
1309 1309 # compression argument is only for the bundle2 case
1310 1310 assert compression is None
1311 1311 if cg.version != '01':
1312 1312 raise error.Abort(_('old bundle types only supports v1 '
1313 1313 'changegroups'))
1314 1314 header, comp = bundletypes[bundletype]
1315 1315 if comp not in util.compressors:
1316 1316 raise error.Abort(_('unknown stream compression type: %s')
1317 1317 % comp)
1318 1318 z = util.compressors[comp]()
1319 1319 subchunkiter = cg.getchunks()
1320 1320 def chunkiter():
1321 1321 yield header
1322 1322 for chunk in subchunkiter:
1323 1323 yield z.compress(chunk)
1324 1324 yield z.flush()
1325 1325 chunkiter = chunkiter()
1326 1326
1327 1327 # parse the changegroup data, otherwise we will block
1328 1328 # in case of sshrepo because we don't know the end of the stream
1329 1329 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1330 1330
1331 1331 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1332 1332 def handlechangegroup(op, inpart):
1333 1333 """apply a changegroup part on the repo
1334 1334
1335 1335 This is a very early implementation that will massive rework before being
1336 1336 inflicted to any end-user.
1337 1337 """
1338 1338 # Make sure we trigger a transaction creation
1339 1339 #
1340 1340 # The addchangegroup function will get a transaction object by itself, but
1341 1341 # we need to make sure we trigger the creation of a transaction object used
1342 1342 # for the whole processing scope.
1343 1343 op.gettransaction()
1344 1344 unpackerversion = inpart.params.get('version', '01')
1345 1345 # We should raise an appropriate exception here
1346 1346 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1347 1347 # the source and url passed here are overwritten by the one contained in
1348 1348 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1349 1349 nbchangesets = None
1350 1350 if 'nbchanges' in inpart.params:
1351 1351 nbchangesets = int(inpart.params.get('nbchanges'))
1352 1352 if ('treemanifest' in inpart.params and
1353 1353 'treemanifest' not in op.repo.requirements):
1354 1354 if len(op.repo.changelog) != 0:
1355 1355 raise error.Abort(_(
1356 1356 "bundle contains tree manifests, but local repo is "
1357 1357 "non-empty and does not use tree manifests"))
1358 1358 op.repo.requirements.add('treemanifest')
1359 1359 op.repo._applyopenerreqs()
1360 1360 op.repo._writerequirements()
1361 1361 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1362 1362 op.records.add('changegroup', {'return': ret})
1363 1363 if op.reply is not None:
1364 1364 # This is definitely not the final form of this
1365 1365 # return. But one need to start somewhere.
1366 1366 part = op.reply.newpart('reply:changegroup', mandatory=False)
1367 1367 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1368 1368 part.addparam('return', '%i' % ret, mandatory=False)
1369 1369 assert not inpart.read()
1370 1370
1371 1371 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1372 1372 ['digest:%s' % k for k in util.DIGESTS.keys()])
1373 1373 @parthandler('remote-changegroup', _remotechangegroupparams)
1374 1374 def handleremotechangegroup(op, inpart):
1375 1375 """apply a bundle10 on the repo, given an url and validation information
1376 1376
1377 1377 All the information about the remote bundle to import are given as
1378 1378 parameters. The parameters include:
1379 1379 - url: the url to the bundle10.
1380 1380 - size: the bundle10 file size. It is used to validate what was
1381 1381 retrieved by the client matches the server knowledge about the bundle.
1382 1382 - digests: a space separated list of the digest types provided as
1383 1383 parameters.
1384 1384 - digest:<digest-type>: the hexadecimal representation of the digest with
1385 1385 that name. Like the size, it is used to validate what was retrieved by
1386 1386 the client matches what the server knows about the bundle.
1387 1387
1388 1388 When multiple digest types are given, all of them are checked.
1389 1389 """
1390 1390 try:
1391 1391 raw_url = inpart.params['url']
1392 1392 except KeyError:
1393 1393 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1394 1394 parsed_url = util.url(raw_url)
1395 1395 if parsed_url.scheme not in capabilities['remote-changegroup']:
1396 1396 raise error.Abort(_('remote-changegroup does not support %s urls') %
1397 1397 parsed_url.scheme)
1398 1398
1399 1399 try:
1400 1400 size = int(inpart.params['size'])
1401 1401 except ValueError:
1402 1402 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1403 1403 % 'size')
1404 1404 except KeyError:
1405 1405 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1406 1406
1407 1407 digests = {}
1408 1408 for typ in inpart.params.get('digests', '').split():
1409 1409 param = 'digest:%s' % typ
1410 1410 try:
1411 1411 value = inpart.params[param]
1412 1412 except KeyError:
1413 1413 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1414 1414 param)
1415 1415 digests[typ] = value
1416 1416
1417 1417 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1418 1418
1419 1419 # Make sure we trigger a transaction creation
1420 1420 #
1421 1421 # The addchangegroup function will get a transaction object by itself, but
1422 1422 # we need to make sure we trigger the creation of a transaction object used
1423 1423 # for the whole processing scope.
1424 1424 op.gettransaction()
1425 1425 from . import exchange
1426 1426 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1427 1427 if not isinstance(cg, changegroup.cg1unpacker):
1428 1428 raise error.Abort(_('%s: not a bundle version 1.0') %
1429 1429 util.hidepassword(raw_url))
1430 1430 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1431 1431 op.records.add('changegroup', {'return': ret})
1432 1432 if op.reply is not None:
1433 1433 # This is definitely not the final form of this
1434 1434 # return. But one need to start somewhere.
1435 1435 part = op.reply.newpart('reply:changegroup')
1436 1436 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1437 1437 part.addparam('return', '%i' % ret, mandatory=False)
1438 1438 try:
1439 1439 real_part.validate()
1440 1440 except error.Abort as e:
1441 1441 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1442 1442 (util.hidepassword(raw_url), str(e)))
1443 1443 assert not inpart.read()
1444 1444
1445 1445 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1446 1446 def handlereplychangegroup(op, inpart):
1447 1447 ret = int(inpart.params['return'])
1448 1448 replyto = int(inpart.params['in-reply-to'])
1449 1449 op.records.add('changegroup', {'return': ret}, replyto)
1450 1450
1451 1451 @parthandler('check:heads')
1452 1452 def handlecheckheads(op, inpart):
1453 1453 """check that head of the repo did not change
1454 1454
1455 1455 This is used to detect a push race when using unbundle.
1456 1456 This replaces the "heads" argument of unbundle."""
1457 1457 h = inpart.read(20)
1458 1458 heads = []
1459 1459 while len(h) == 20:
1460 1460 heads.append(h)
1461 1461 h = inpart.read(20)
1462 1462 assert not h
1463 1463 # Trigger a transaction so that we are guaranteed to have the lock now.
1464 1464 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1465 1465 op.gettransaction()
1466 1466 if sorted(heads) != sorted(op.repo.heads()):
1467 1467 raise error.PushRaced('repository changed while pushing - '
1468 1468 'please try again')
1469 1469
1470 1470 @parthandler('output')
1471 1471 def handleoutput(op, inpart):
1472 1472 """forward output captured on the server to the client"""
1473 1473 for line in inpart.read().splitlines():
1474 op.ui.status(('remote: %s\n' % line))
1474 op.ui.status(_('remote: %s\n') % line)
1475 1475
1476 1476 @parthandler('replycaps')
1477 1477 def handlereplycaps(op, inpart):
1478 1478 """Notify that a reply bundle should be created
1479 1479
1480 1480 The payload contains the capabilities information for the reply"""
1481 1481 caps = decodecaps(inpart.read())
1482 1482 if op.reply is None:
1483 1483 op.reply = bundle20(op.ui, caps)
1484 1484
1485 1485 class AbortFromPart(error.Abort):
1486 1486 """Sub-class of Abort that denotes an error from a bundle2 part."""
1487 1487
1488 1488 @parthandler('error:abort', ('message', 'hint'))
1489 1489 def handleerrorabort(op, inpart):
1490 1490 """Used to transmit abort error over the wire"""
1491 1491 raise AbortFromPart(inpart.params['message'],
1492 1492 hint=inpart.params.get('hint'))
1493 1493
1494 1494 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1495 1495 'in-reply-to'))
1496 1496 def handleerrorpushkey(op, inpart):
1497 1497 """Used to transmit failure of a mandatory pushkey over the wire"""
1498 1498 kwargs = {}
1499 1499 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1500 1500 value = inpart.params.get(name)
1501 1501 if value is not None:
1502 1502 kwargs[name] = value
1503 1503 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1504 1504
1505 1505 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1506 1506 def handleerrorunsupportedcontent(op, inpart):
1507 1507 """Used to transmit unknown content error over the wire"""
1508 1508 kwargs = {}
1509 1509 parttype = inpart.params.get('parttype')
1510 1510 if parttype is not None:
1511 1511 kwargs['parttype'] = parttype
1512 1512 params = inpart.params.get('params')
1513 1513 if params is not None:
1514 1514 kwargs['params'] = params.split('\0')
1515 1515
1516 1516 raise error.BundleUnknownFeatureError(**kwargs)
1517 1517
1518 1518 @parthandler('error:pushraced', ('message',))
1519 1519 def handleerrorpushraced(op, inpart):
1520 1520 """Used to transmit push race error over the wire"""
1521 1521 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1522 1522
1523 1523 @parthandler('listkeys', ('namespace',))
1524 1524 def handlelistkeys(op, inpart):
1525 1525 """retrieve pushkey namespace content stored in a bundle2"""
1526 1526 namespace = inpart.params['namespace']
1527 1527 r = pushkey.decodekeys(inpart.read())
1528 1528 op.records.add('listkeys', (namespace, r))
1529 1529
1530 1530 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1531 1531 def handlepushkey(op, inpart):
1532 1532 """process a pushkey request"""
1533 1533 dec = pushkey.decode
1534 1534 namespace = dec(inpart.params['namespace'])
1535 1535 key = dec(inpart.params['key'])
1536 1536 old = dec(inpart.params['old'])
1537 1537 new = dec(inpart.params['new'])
1538 1538 # Grab the transaction to ensure that we have the lock before performing the
1539 1539 # pushkey.
1540 1540 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1541 1541 op.gettransaction()
1542 1542 ret = op.repo.pushkey(namespace, key, old, new)
1543 1543 record = {'namespace': namespace,
1544 1544 'key': key,
1545 1545 'old': old,
1546 1546 'new': new}
1547 1547 op.records.add('pushkey', record)
1548 1548 if op.reply is not None:
1549 1549 rpart = op.reply.newpart('reply:pushkey')
1550 1550 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1551 1551 rpart.addparam('return', '%i' % ret, mandatory=False)
1552 1552 if inpart.mandatory and not ret:
1553 1553 kwargs = {}
1554 1554 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1555 1555 if key in inpart.params:
1556 1556 kwargs[key] = inpart.params[key]
1557 1557 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1558 1558
1559 1559 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1560 1560 def handlepushkeyreply(op, inpart):
1561 1561 """retrieve the result of a pushkey request"""
1562 1562 ret = int(inpart.params['return'])
1563 1563 partid = int(inpart.params['in-reply-to'])
1564 1564 op.records.add('pushkey', {'return': ret}, partid)
1565 1565
1566 1566 @parthandler('obsmarkers')
1567 1567 def handleobsmarker(op, inpart):
1568 1568 """add a stream of obsmarkers to the repo"""
1569 1569 tr = op.gettransaction()
1570 1570 markerdata = inpart.read()
1571 1571 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1572 1572 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1573 1573 % len(markerdata))
1574 1574 # The mergemarkers call will crash if marker creation is not enabled.
1575 1575 # we want to avoid this if the part is advisory.
1576 1576 if not inpart.mandatory and op.repo.obsstore.readonly:
1577 1577 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1578 1578 return
1579 1579 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1580 1580 if new:
1581 1581 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1582 1582 op.records.add('obsmarkers', {'new': new})
1583 1583 if op.reply is not None:
1584 1584 rpart = op.reply.newpart('reply:obsmarkers')
1585 1585 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1586 1586 rpart.addparam('new', '%i' % new, mandatory=False)
1587 1587
1588 1588
1589 1589 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1590 1590 def handleobsmarkerreply(op, inpart):
1591 1591 """retrieve the result of a pushkey request"""
1592 1592 ret = int(inpart.params['new'])
1593 1593 partid = int(inpart.params['in-reply-to'])
1594 1594 op.records.add('obsmarkers', {'new': ret}, partid)
1595 1595
1596 1596 @parthandler('hgtagsfnodes')
1597 1597 def handlehgtagsfnodes(op, inpart):
1598 1598 """Applies .hgtags fnodes cache entries to the local repo.
1599 1599
1600 1600 Payload is pairs of 20 byte changeset nodes and filenodes.
1601 1601 """
1602 1602 # Grab the transaction so we ensure that we have the lock at this point.
1603 1603 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1604 1604 op.gettransaction()
1605 1605 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1606 1606
1607 1607 count = 0
1608 1608 while True:
1609 1609 node = inpart.read(20)
1610 1610 fnode = inpart.read(20)
1611 1611 if len(node) < 20 or len(fnode) < 20:
1612 1612 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1613 1613 break
1614 1614 cache.setfnode(node, fnode)
1615 1615 count += 1
1616 1616
1617 1617 cache.write()
1618 1618 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now