##// END OF EJS Templates
bundle2: add a way to just forward the bundle2 stream to another user...
Pierre-Yves David -
r26542:b87e4638 default
parent child Browse files
Show More
@@ -1,1471 +1,1513
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155 import urllib
156 156
157 157 from .i18n import _
158 158 from . import (
159 159 changegroup,
160 160 error,
161 161 obsolete,
162 162 pushkey,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 _pack = struct.pack
169 169 _unpack = struct.unpack
170 170
171 171 _fstreamparamsize = '>i'
172 172 _fpartheadersize = '>i'
173 173 _fparttypesize = '>B'
174 174 _fpartid = '>I'
175 175 _fpayloadsize = '>i'
176 176 _fpartparamcount = '>BB'
177 177
178 178 preferedchunksize = 4096
179 179
180 180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181 181
182 182 def outdebug(ui, message):
183 183 """debug regarding output stream (bundling)"""
184 184 if ui.configbool('devel', 'bundle2.debug', False):
185 185 ui.debug('bundle2-output: %s\n' % message)
186 186
187 187 def indebug(ui, message):
188 188 """debug on input stream (unbundling)"""
189 189 if ui.configbool('devel', 'bundle2.debug', False):
190 190 ui.debug('bundle2-input: %s\n' % message)
191 191
192 192 def validateparttype(parttype):
193 193 """raise ValueError if a parttype contains invalid character"""
194 194 if _parttypeforbidden.search(parttype):
195 195 raise ValueError(parttype)
196 196
197 197 def _makefpartparamsizes(nbparams):
198 198 """return a struct format to read part parameter sizes
199 199
200 200 The number parameters is variable so we need to build that format
201 201 dynamically.
202 202 """
203 203 return '>'+('BB'*nbparams)
204 204
205 205 parthandlermapping = {}
206 206
207 207 def parthandler(parttype, params=()):
208 208 """decorator that register a function as a bundle2 part handler
209 209
210 210 eg::
211 211
212 212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 213 def myparttypehandler(...):
214 214 '''process a part of type "my part".'''
215 215 ...
216 216 """
217 217 validateparttype(parttype)
218 218 def _decorator(func):
219 219 lparttype = parttype.lower() # enforce lower case matching.
220 220 assert lparttype not in parthandlermapping
221 221 parthandlermapping[lparttype] = func
222 222 func.params = frozenset(params)
223 223 return func
224 224 return _decorator
225 225
226 226 class unbundlerecords(object):
227 227 """keep record of what happens during and unbundle
228 228
229 229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 230 category of record and obj is an arbitrary object.
231 231
232 232 `records['cat']` will return all entries of this category 'cat'.
233 233
234 234 Iterating on the object itself will yield `('category', obj)` tuples
235 235 for all entries.
236 236
237 237 All iterations happens in chronological order.
238 238 """
239 239
240 240 def __init__(self):
241 241 self._categories = {}
242 242 self._sequences = []
243 243 self._replies = {}
244 244
245 245 def add(self, category, entry, inreplyto=None):
246 246 """add a new record of a given category.
247 247
248 248 The entry can then be retrieved in the list returned by
249 249 self['category']."""
250 250 self._categories.setdefault(category, []).append(entry)
251 251 self._sequences.append((category, entry))
252 252 if inreplyto is not None:
253 253 self.getreplies(inreplyto).add(category, entry)
254 254
255 255 def getreplies(self, partid):
256 256 """get the records that are replies to a specific part"""
257 257 return self._replies.setdefault(partid, unbundlerecords())
258 258
259 259 def __getitem__(self, cat):
260 260 return tuple(self._categories.get(cat, ()))
261 261
262 262 def __iter__(self):
263 263 return iter(self._sequences)
264 264
265 265 def __len__(self):
266 266 return len(self._sequences)
267 267
268 268 def __nonzero__(self):
269 269 return bool(self._sequences)
270 270
271 271 class bundleoperation(object):
272 272 """an object that represents a single bundling process
273 273
274 274 Its purpose is to carry unbundle-related objects and states.
275 275
276 276 A new object should be created at the beginning of each bundle processing.
277 277 The object is to be returned by the processing function.
278 278
279 279 The object has very little content now it will ultimately contain:
280 280 * an access to the repo the bundle is applied to,
281 281 * a ui object,
282 282 * a way to retrieve a transaction to add changes to the repo,
283 283 * a way to record the result of processing each part,
284 284 * a way to construct a bundle response when applicable.
285 285 """
286 286
287 287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 288 self.repo = repo
289 289 self.ui = repo.ui
290 290 self.records = unbundlerecords()
291 291 self.gettransaction = transactiongetter
292 292 self.reply = None
293 293 self.captureoutput = captureoutput
294 294
295 295 class TransactionUnavailable(RuntimeError):
296 296 pass
297 297
298 298 def _notransaction():
299 299 """default method to get a transaction while processing a bundle
300 300
301 301 Raise an exception to highlight the fact that no transaction was expected
302 302 to be created"""
303 303 raise TransactionUnavailable()
304 304
305 305 def processbundle(repo, unbundler, transactiongetter=None, op=None):
306 306 """This function process a bundle, apply effect to/from a repo
307 307
308 308 It iterates over each part then searches for and uses the proper handling
309 309 code to process the part. Parts are processed in order.
310 310
311 311 This is very early version of this function that will be strongly reworked
312 312 before final usage.
313 313
314 314 Unknown Mandatory part will abort the process.
315 315
316 316 It is temporarily possible to provide a prebuilt bundleoperation to the
317 317 function. This is used to ensure output is properly propagated in case of
318 318 an error during the unbundling. This output capturing part will likely be
319 319 reworked and this ability will probably go away in the process.
320 320 """
321 321 if op is None:
322 322 if transactiongetter is None:
323 323 transactiongetter = _notransaction
324 324 op = bundleoperation(repo, transactiongetter)
325 325 # todo:
326 326 # - replace this is a init function soon.
327 327 # - exception catching
328 328 unbundler.params
329 329 if repo.ui.debugflag:
330 330 msg = ['bundle2-input-bundle:']
331 331 if unbundler.params:
332 332 msg.append(' %i params')
333 333 if op.gettransaction is None:
334 334 msg.append(' no-transaction')
335 335 else:
336 336 msg.append(' with-transaction')
337 337 msg.append('\n')
338 338 repo.ui.debug(''.join(msg))
339 339 iterparts = enumerate(unbundler.iterparts())
340 340 part = None
341 341 nbpart = 0
342 342 try:
343 343 for nbpart, part in iterparts:
344 344 _processpart(op, part)
345 345 except BaseException as exc:
346 346 for nbpart, part in iterparts:
347 347 # consume the bundle content
348 348 part.seek(0, 2)
349 349 # Small hack to let caller code distinguish exceptions from bundle2
350 350 # processing from processing the old format. This is mostly
351 351 # needed to handle different return codes to unbundle according to the
352 352 # type of bundle. We should probably clean up or drop this return code
353 353 # craziness in a future version.
354 354 exc.duringunbundle2 = True
355 355 salvaged = []
356 356 replycaps = None
357 357 if op.reply is not None:
358 358 salvaged = op.reply.salvageoutput()
359 359 replycaps = op.reply.capabilities
360 360 exc._replycaps = replycaps
361 361 exc._bundle2salvagedoutput = salvaged
362 362 raise
363 363 finally:
364 364 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
365 365
366 366 return op
367 367
368 368 def _processpart(op, part):
369 369 """process a single part from a bundle
370 370
371 371 The part is guaranteed to have been fully consumed when the function exits
372 372 (even if an exception is raised)."""
373 373 status = 'unknown' # used by debug output
374 374 try:
375 375 try:
376 376 handler = parthandlermapping.get(part.type)
377 377 if handler is None:
378 378 status = 'unsupported-type'
379 379 raise error.BundleUnknownFeatureError(parttype=part.type)
380 380 indebug(op.ui, 'found a handler for part %r' % part.type)
381 381 unknownparams = part.mandatorykeys - handler.params
382 382 if unknownparams:
383 383 unknownparams = list(unknownparams)
384 384 unknownparams.sort()
385 385 status = 'unsupported-params (%s)' % unknownparams
386 386 raise error.BundleUnknownFeatureError(parttype=part.type,
387 387 params=unknownparams)
388 388 status = 'supported'
389 389 except error.BundleUnknownFeatureError as exc:
390 390 if part.mandatory: # mandatory parts
391 391 raise
392 392 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
393 393 return # skip to part processing
394 394 finally:
395 395 if op.ui.debugflag:
396 396 msg = ['bundle2-input-part: "%s"' % part.type]
397 397 if not part.mandatory:
398 398 msg.append(' (advisory)')
399 399 nbmp = len(part.mandatorykeys)
400 400 nbap = len(part.params) - nbmp
401 401 if nbmp or nbap:
402 402 msg.append(' (params:')
403 403 if nbmp:
404 404 msg.append(' %i mandatory' % nbmp)
405 405 if nbap:
406 406 msg.append(' %i advisory' % nbmp)
407 407 msg.append(')')
408 408 msg.append(' %s\n' % status)
409 409 op.ui.debug(''.join(msg))
410 410
411 411 # handler is called outside the above try block so that we don't
412 412 # risk catching KeyErrors from anything other than the
413 413 # parthandlermapping lookup (any KeyError raised by handler()
414 414 # itself represents a defect of a different variety).
415 415 output = None
416 416 if op.captureoutput and op.reply is not None:
417 417 op.ui.pushbuffer(error=True, subproc=True)
418 418 output = ''
419 419 try:
420 420 handler(op, part)
421 421 finally:
422 422 if output is not None:
423 423 output = op.ui.popbuffer()
424 424 if output:
425 425 outpart = op.reply.newpart('output', data=output,
426 426 mandatory=False)
427 427 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
428 428 finally:
429 429 # consume the part content to not corrupt the stream.
430 430 part.seek(0, 2)
431 431
432 432
433 433 def decodecaps(blob):
434 434 """decode a bundle2 caps bytes blob into a dictionary
435 435
436 436 The blob is a list of capabilities (one per line)
437 437 Capabilities may have values using a line of the form::
438 438
439 439 capability=value1,value2,value3
440 440
441 441 The values are always a list."""
442 442 caps = {}
443 443 for line in blob.splitlines():
444 444 if not line:
445 445 continue
446 446 if '=' not in line:
447 447 key, vals = line, ()
448 448 else:
449 449 key, vals = line.split('=', 1)
450 450 vals = vals.split(',')
451 451 key = urllib.unquote(key)
452 452 vals = [urllib.unquote(v) for v in vals]
453 453 caps[key] = vals
454 454 return caps
455 455
456 456 def encodecaps(caps):
457 457 """encode a bundle2 caps dictionary into a bytes blob"""
458 458 chunks = []
459 459 for ca in sorted(caps):
460 460 vals = caps[ca]
461 461 ca = urllib.quote(ca)
462 462 vals = [urllib.quote(v) for v in vals]
463 463 if vals:
464 464 ca = "%s=%s" % (ca, ','.join(vals))
465 465 chunks.append(ca)
466 466 return '\n'.join(chunks)
467 467
468 468 class bundle20(object):
469 469 """represent an outgoing bundle2 container
470 470
471 471 Use the `addparam` method to add stream level parameter. and `newpart` to
472 472 populate it. Then call `getchunks` to retrieve all the binary chunks of
473 473 data that compose the bundle2 container."""
474 474
475 475 _magicstring = 'HG20'
476 476
477 477 def __init__(self, ui, capabilities=()):
478 478 self.ui = ui
479 479 self._params = []
480 480 self._parts = []
481 481 self.capabilities = dict(capabilities)
482 482 self._compressor = util.compressors[None]()
483 483
484 484 def setcompression(self, alg):
485 485 """setup core part compression to <alg>"""
486 486 if alg is None:
487 487 return
488 488 assert not any(n.lower() == 'Compression' for n, v in self._params)
489 489 self.addparam('Compression', alg)
490 490 self._compressor = util.compressors[alg]()
491 491
492 492 @property
493 493 def nbparts(self):
494 494 """total number of parts added to the bundler"""
495 495 return len(self._parts)
496 496
497 497 # methods used to defines the bundle2 content
498 498 def addparam(self, name, value=None):
499 499 """add a stream level parameter"""
500 500 if not name:
501 501 raise ValueError('empty parameter name')
502 502 if name[0] not in string.letters:
503 503 raise ValueError('non letter first character: %r' % name)
504 504 self._params.append((name, value))
505 505
506 506 def addpart(self, part):
507 507 """add a new part to the bundle2 container
508 508
509 509 Parts contains the actual applicative payload."""
510 510 assert part.id is None
511 511 part.id = len(self._parts) # very cheap counter
512 512 self._parts.append(part)
513 513
514 514 def newpart(self, typeid, *args, **kwargs):
515 515 """create a new part and add it to the containers
516 516
517 517 As the part is directly added to the containers. For now, this means
518 518 that any failure to properly initialize the part after calling
519 519 ``newpart`` should result in a failure of the whole bundling process.
520 520
521 521 You can still fall back to manually create and add if you need better
522 522 control."""
523 523 part = bundlepart(typeid, *args, **kwargs)
524 524 self.addpart(part)
525 525 return part
526 526
527 527 # methods used to generate the bundle2 stream
528 528 def getchunks(self):
529 529 if self.ui.debugflag:
530 530 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
531 531 if self._params:
532 532 msg.append(' (%i params)' % len(self._params))
533 533 msg.append(' %i parts total\n' % len(self._parts))
534 534 self.ui.debug(''.join(msg))
535 535 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
536 536 yield self._magicstring
537 537 param = self._paramchunk()
538 538 outdebug(self.ui, 'bundle parameter: %s' % param)
539 539 yield _pack(_fstreamparamsize, len(param))
540 540 if param:
541 541 yield param
542 542 # starting compression
543 543 for chunk in self._getcorechunk():
544 544 yield self._compressor.compress(chunk)
545 545 yield self._compressor.flush()
546 546
547 547 def _paramchunk(self):
548 548 """return a encoded version of all stream parameters"""
549 549 blocks = []
550 550 for par, value in self._params:
551 551 par = urllib.quote(par)
552 552 if value is not None:
553 553 value = urllib.quote(value)
554 554 par = '%s=%s' % (par, value)
555 555 blocks.append(par)
556 556 return ' '.join(blocks)
557 557
558 558 def _getcorechunk(self):
559 559 """yield chunk for the core part of the bundle
560 560
561 561 (all but headers and parameters)"""
562 562 outdebug(self.ui, 'start of parts')
563 563 for part in self._parts:
564 564 outdebug(self.ui, 'bundle part: "%s"' % part.type)
565 565 for chunk in part.getchunks(ui=self.ui):
566 566 yield chunk
567 567 outdebug(self.ui, 'end of bundle')
568 568 yield _pack(_fpartheadersize, 0)
569 569
570 570
571 571 def salvageoutput(self):
572 572 """return a list with a copy of all output parts in the bundle
573 573
574 574 This is meant to be used during error handling to make sure we preserve
575 575 server output"""
576 576 salvaged = []
577 577 for part in self._parts:
578 578 if part.type.startswith('output'):
579 579 salvaged.append(part.copy())
580 580 return salvaged
581 581
582 582
583 583 class unpackermixin(object):
584 584 """A mixin to extract bytes and struct data from a stream"""
585 585
586 586 def __init__(self, fp):
587 587 self._fp = fp
588 588 self._seekable = (util.safehasattr(fp, 'seek') and
589 589 util.safehasattr(fp, 'tell'))
590 590
591 591 def _unpack(self, format):
592 592 """unpack this struct format from the stream"""
593 593 data = self._readexact(struct.calcsize(format))
594 594 return _unpack(format, data)
595 595
596 596 def _readexact(self, size):
597 597 """read exactly <size> bytes from the stream"""
598 598 return changegroup.readexactly(self._fp, size)
599 599
600 600 def seek(self, offset, whence=0):
601 601 """move the underlying file pointer"""
602 602 if self._seekable:
603 603 return self._fp.seek(offset, whence)
604 604 else:
605 605 raise NotImplementedError(_('File pointer is not seekable'))
606 606
607 607 def tell(self):
608 608 """return the file offset, or None if file is not seekable"""
609 609 if self._seekable:
610 610 try:
611 611 return self._fp.tell()
612 612 except IOError as e:
613 613 if e.errno == errno.ESPIPE:
614 614 self._seekable = False
615 615 else:
616 616 raise
617 617 return None
618 618
619 619 def close(self):
620 620 """close underlying file"""
621 621 if util.safehasattr(self._fp, 'close'):
622 622 return self._fp.close()
623 623
624 624 def getunbundler(ui, fp, magicstring=None):
625 625 """return a valid unbundler object for a given magicstring"""
626 626 if magicstring is None:
627 627 magicstring = changegroup.readexactly(fp, 4)
628 628 magic, version = magicstring[0:2], magicstring[2:4]
629 629 if magic != 'HG':
630 630 raise util.Abort(_('not a Mercurial bundle'))
631 631 unbundlerclass = formatmap.get(version)
632 632 if unbundlerclass is None:
633 633 raise util.Abort(_('unknown bundle version %s') % version)
634 634 unbundler = unbundlerclass(ui, fp)
635 635 indebug(ui, 'start processing of %s stream' % magicstring)
636 636 return unbundler
637 637
638 638 class unbundle20(unpackermixin):
639 639 """interpret a bundle2 stream
640 640
641 641 This class is fed with a binary stream and yields parts through its
642 642 `iterparts` methods."""
643 643
644 _magicstring = 'HG20'
645
644 646 def __init__(self, ui, fp):
645 647 """If header is specified, we do not read it out of the stream."""
646 648 self.ui = ui
647 649 self._decompressor = util.decompressors[None]
648 650 super(unbundle20, self).__init__(fp)
649 651
650 652 @util.propertycache
651 653 def params(self):
652 654 """dictionary of stream level parameters"""
653 655 indebug(self.ui, 'reading bundle2 stream parameters')
654 656 params = {}
655 657 paramssize = self._unpack(_fstreamparamsize)[0]
656 658 if paramssize < 0:
657 659 raise error.BundleValueError('negative bundle param size: %i'
658 660 % paramssize)
659 661 if paramssize:
660 662 params = self._readexact(paramssize)
661 663 params = self._processallparams(params)
662 664 return params
663 665
664 666 def _processallparams(self, paramsblock):
665 667 """"""
666 668 params = {}
667 669 for p in paramsblock.split(' '):
668 670 p = p.split('=', 1)
669 671 p = [urllib.unquote(i) for i in p]
670 672 if len(p) < 2:
671 673 p.append(None)
672 674 self._processparam(*p)
673 675 params[p[0]] = p[1]
674 676 return params
675 677
676 678
677 679 def _processparam(self, name, value):
678 680 """process a parameter, applying its effect if needed
679 681
680 682 Parameter starting with a lower case letter are advisory and will be
681 683 ignored when unknown. Those starting with an upper case letter are
682 684 mandatory and will this function will raise a KeyError when unknown.
683 685
684 686 Note: no option are currently supported. Any input will be either
685 687 ignored or failing.
686 688 """
687 689 if not name:
688 690 raise ValueError('empty parameter name')
689 691 if name[0] not in string.letters:
690 692 raise ValueError('non letter first character: %r' % name)
691 693 try:
692 694 handler = b2streamparamsmap[name.lower()]
693 695 except KeyError:
694 696 if name[0].islower():
695 697 indebug(self.ui, "ignoring unknown parameter %r" % name)
696 698 else:
697 699 raise error.BundleUnknownFeatureError(params=(name,))
698 700 else:
699 701 handler(self, name, value)
700 702
703 def _forwardchunks(self):
704 """utility to transfer a bundle2 as binary
705
706 This is made necessary by the fact the 'getbundle' command over 'ssh'
707 have no way to know then the reply end, relying on the bundle to be
708 interpreted to know its end. This is terrible and we are sorry, but we
709 needed to move forward to get general delta enabled.
710 """
711 yield self._magicstring
712 assert 'params' not in vars(self)
713 paramssize = self._unpack(_fstreamparamsize)[0]
714 if paramssize < 0:
715 raise error.BundleValueError('negative bundle param size: %i'
716 % paramssize)
717 yield _pack(_fstreamparamsize, paramssize)
718 if paramssize:
719 params = self._readexact(paramssize)
720 self._processallparams(params)
721 yield params
722 assert self._decompressor is util.decompressors[None]
723 # From there, payload might need to be decompressed
724 self._fp = self._decompressor(self._fp)
725 emptycount = 0
726 while emptycount < 2:
727 # so we can brainlessly loop
728 assert _fpartheadersize == _fpayloadsize
729 size = self._unpack(_fpartheadersize)[0]
730 yield _pack(_fpartheadersize, size)
731 if size:
732 emptycount = 0
733 else:
734 emptycount += 1
735 continue
736 if size == flaginterrupt:
737 continue
738 elif size < 0:
739 raise error.BundleValueError('negative chunk size: %i')
740 yield self._readexact(size)
741
742
701 743 def iterparts(self):
702 744 """yield all parts contained in the stream"""
703 745 # make sure param have been loaded
704 746 self.params
705 747 # From there, payload need to be decompressed
706 748 self._fp = self._decompressor(self._fp)
707 749 indebug(self.ui, 'start extraction of bundle2 parts')
708 750 headerblock = self._readpartheader()
709 751 while headerblock is not None:
710 752 part = unbundlepart(self.ui, headerblock, self._fp)
711 753 yield part
712 754 part.seek(0, 2)
713 755 headerblock = self._readpartheader()
714 756 indebug(self.ui, 'end of bundle2 stream')
715 757
716 758 def _readpartheader(self):
717 759 """reads a part header size and return the bytes blob
718 760
719 761 returns None if empty"""
720 762 headersize = self._unpack(_fpartheadersize)[0]
721 763 if headersize < 0:
722 764 raise error.BundleValueError('negative part header size: %i'
723 765 % headersize)
724 766 indebug(self.ui, 'part header size: %i' % headersize)
725 767 if headersize:
726 768 return self._readexact(headersize)
727 769 return None
728 770
729 771 def compressed(self):
730 772 return False
731 773
732 774 formatmap = {'20': unbundle20}
733 775
734 776 b2streamparamsmap = {}
735 777
736 778 def b2streamparamhandler(name):
737 779 """register a handler for a stream level parameter"""
738 780 def decorator(func):
739 781 assert name not in formatmap
740 782 b2streamparamsmap[name] = func
741 783 return func
742 784 return decorator
743 785
744 786 @b2streamparamhandler('compression')
745 787 def processcompression(unbundler, param, value):
746 788 """read compression parameter and install payload decompression"""
747 789 if value not in util.decompressors:
748 790 raise error.BundleUnknownFeatureError(params=(param,),
749 791 values=(value,))
750 792 unbundler._decompressor = util.decompressors[value]
751 793
752 794 class bundlepart(object):
753 795 """A bundle2 part contains application level payload
754 796
755 797 The part `type` is used to route the part to the application level
756 798 handler.
757 799
758 800 The part payload is contained in ``part.data``. It could be raw bytes or a
759 801 generator of byte chunks.
760 802
761 803 You can add parameters to the part using the ``addparam`` method.
762 804 Parameters can be either mandatory (default) or advisory. Remote side
763 805 should be able to safely ignore the advisory ones.
764 806
765 807 Both data and parameters cannot be modified after the generation has begun.
766 808 """
767 809
768 810 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
769 811 data='', mandatory=True):
770 812 validateparttype(parttype)
771 813 self.id = None
772 814 self.type = parttype
773 815 self._data = data
774 816 self._mandatoryparams = list(mandatoryparams)
775 817 self._advisoryparams = list(advisoryparams)
776 818 # checking for duplicated entries
777 819 self._seenparams = set()
778 820 for pname, __ in self._mandatoryparams + self._advisoryparams:
779 821 if pname in self._seenparams:
780 822 raise RuntimeError('duplicated params: %s' % pname)
781 823 self._seenparams.add(pname)
782 824 # status of the part's generation:
783 825 # - None: not started,
784 826 # - False: currently generated,
785 827 # - True: generation done.
786 828 self._generated = None
787 829 self.mandatory = mandatory
788 830
789 831 def copy(self):
790 832 """return a copy of the part
791 833
792 834 The new part have the very same content but no partid assigned yet.
793 835 Parts with generated data cannot be copied."""
794 836 assert not util.safehasattr(self.data, 'next')
795 837 return self.__class__(self.type, self._mandatoryparams,
796 838 self._advisoryparams, self._data, self.mandatory)
797 839
798 840 # methods used to defines the part content
799 841 def __setdata(self, data):
800 842 if self._generated is not None:
801 843 raise error.ReadOnlyPartError('part is being generated')
802 844 self._data = data
803 845 def __getdata(self):
804 846 return self._data
805 847 data = property(__getdata, __setdata)
806 848
807 849 @property
808 850 def mandatoryparams(self):
809 851 # make it an immutable tuple to force people through ``addparam``
810 852 return tuple(self._mandatoryparams)
811 853
812 854 @property
813 855 def advisoryparams(self):
814 856 # make it an immutable tuple to force people through ``addparam``
815 857 return tuple(self._advisoryparams)
816 858
817 859 def addparam(self, name, value='', mandatory=True):
818 860 if self._generated is not None:
819 861 raise error.ReadOnlyPartError('part is being generated')
820 862 if name in self._seenparams:
821 863 raise ValueError('duplicated params: %s' % name)
822 864 self._seenparams.add(name)
823 865 params = self._advisoryparams
824 866 if mandatory:
825 867 params = self._mandatoryparams
826 868 params.append((name, value))
827 869
828 870 # methods used to generates the bundle2 stream
829 871 def getchunks(self, ui):
830 872 if self._generated is not None:
831 873 raise RuntimeError('part can only be consumed once')
832 874 self._generated = False
833 875
834 876 if ui.debugflag:
835 877 msg = ['bundle2-output-part: "%s"' % self.type]
836 878 if not self.mandatory:
837 879 msg.append(' (advisory)')
838 880 nbmp = len(self.mandatoryparams)
839 881 nbap = len(self.advisoryparams)
840 882 if nbmp or nbap:
841 883 msg.append(' (params:')
842 884 if nbmp:
843 885 msg.append(' %i mandatory' % nbmp)
844 886 if nbap:
845 887 msg.append(' %i advisory' % nbmp)
846 888 msg.append(')')
847 889 if not self.data:
848 890 msg.append(' empty payload')
849 891 elif util.safehasattr(self.data, 'next'):
850 892 msg.append(' streamed payload')
851 893 else:
852 894 msg.append(' %i bytes payload' % len(self.data))
853 895 msg.append('\n')
854 896 ui.debug(''.join(msg))
855 897
856 898 #### header
857 899 if self.mandatory:
858 900 parttype = self.type.upper()
859 901 else:
860 902 parttype = self.type.lower()
861 903 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
862 904 ## parttype
863 905 header = [_pack(_fparttypesize, len(parttype)),
864 906 parttype, _pack(_fpartid, self.id),
865 907 ]
866 908 ## parameters
867 909 # count
868 910 manpar = self.mandatoryparams
869 911 advpar = self.advisoryparams
870 912 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
871 913 # size
872 914 parsizes = []
873 915 for key, value in manpar:
874 916 parsizes.append(len(key))
875 917 parsizes.append(len(value))
876 918 for key, value in advpar:
877 919 parsizes.append(len(key))
878 920 parsizes.append(len(value))
879 921 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
880 922 header.append(paramsizes)
881 923 # key, value
882 924 for key, value in manpar:
883 925 header.append(key)
884 926 header.append(value)
885 927 for key, value in advpar:
886 928 header.append(key)
887 929 header.append(value)
888 930 ## finalize header
889 931 headerchunk = ''.join(header)
890 932 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
891 933 yield _pack(_fpartheadersize, len(headerchunk))
892 934 yield headerchunk
893 935 ## payload
894 936 try:
895 937 for chunk in self._payloadchunks():
896 938 outdebug(ui, 'payload chunk size: %i' % len(chunk))
897 939 yield _pack(_fpayloadsize, len(chunk))
898 940 yield chunk
899 941 except GeneratorExit:
900 942 # GeneratorExit means that nobody is listening for our
901 943 # results anyway, so just bail quickly rather than trying
902 944 # to produce an error part.
903 945 ui.debug('bundle2-generatorexit\n')
904 946 raise
905 947 except BaseException as exc:
906 948 # backup exception data for later
907 949 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
908 950 % exc)
909 951 exc_info = sys.exc_info()
910 952 msg = 'unexpected error: %s' % exc
911 953 interpart = bundlepart('error:abort', [('message', msg)],
912 954 mandatory=False)
913 955 interpart.id = 0
914 956 yield _pack(_fpayloadsize, -1)
915 957 for chunk in interpart.getchunks(ui=ui):
916 958 yield chunk
917 959 outdebug(ui, 'closing payload chunk')
918 960 # abort current part payload
919 961 yield _pack(_fpayloadsize, 0)
920 962 raise exc_info[0], exc_info[1], exc_info[2]
921 963 # end of payload
922 964 outdebug(ui, 'closing payload chunk')
923 965 yield _pack(_fpayloadsize, 0)
924 966 self._generated = True
925 967
926 968 def _payloadchunks(self):
927 969 """yield chunks of a the part payload
928 970
929 971 Exists to handle the different methods to provide data to a part."""
930 972 # we only support fixed size data now.
931 973 # This will be improved in the future.
932 974 if util.safehasattr(self.data, 'next'):
933 975 buff = util.chunkbuffer(self.data)
934 976 chunk = buff.read(preferedchunksize)
935 977 while chunk:
936 978 yield chunk
937 979 chunk = buff.read(preferedchunksize)
938 980 elif len(self.data):
939 981 yield self.data
940 982
941 983
942 984 flaginterrupt = -1
943 985
944 986 class interrupthandler(unpackermixin):
945 987 """read one part and process it with restricted capability
946 988
947 989 This allows to transmit exception raised on the producer size during part
948 990 iteration while the consumer is reading a part.
949 991
950 992 Part processed in this manner only have access to a ui object,"""
951 993
952 994 def __init__(self, ui, fp):
953 995 super(interrupthandler, self).__init__(fp)
954 996 self.ui = ui
955 997
956 998 def _readpartheader(self):
957 999 """reads a part header size and return the bytes blob
958 1000
959 1001 returns None if empty"""
960 1002 headersize = self._unpack(_fpartheadersize)[0]
961 1003 if headersize < 0:
962 1004 raise error.BundleValueError('negative part header size: %i'
963 1005 % headersize)
964 1006 indebug(self.ui, 'part header size: %i\n' % headersize)
965 1007 if headersize:
966 1008 return self._readexact(headersize)
967 1009 return None
968 1010
969 1011 def __call__(self):
970 1012
971 1013 self.ui.debug('bundle2-input-stream-interrupt:'
972 1014 ' opening out of band context\n')
973 1015 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
974 1016 headerblock = self._readpartheader()
975 1017 if headerblock is None:
976 1018 indebug(self.ui, 'no part found during interruption.')
977 1019 return
978 1020 part = unbundlepart(self.ui, headerblock, self._fp)
979 1021 op = interruptoperation(self.ui)
980 1022 _processpart(op, part)
981 1023 self.ui.debug('bundle2-input-stream-interrupt:'
982 1024 ' closing out of band context\n')
983 1025
984 1026 class interruptoperation(object):
985 1027 """A limited operation to be use by part handler during interruption
986 1028
987 1029 It only have access to an ui object.
988 1030 """
989 1031
990 1032 def __init__(self, ui):
991 1033 self.ui = ui
992 1034 self.reply = None
993 1035 self.captureoutput = False
994 1036
995 1037 @property
996 1038 def repo(self):
997 1039 raise RuntimeError('no repo access from stream interruption')
998 1040
999 1041 def gettransaction(self):
1000 1042 raise TransactionUnavailable('no repo access from stream interruption')
1001 1043
1002 1044 class unbundlepart(unpackermixin):
1003 1045 """a bundle part read from a bundle"""
1004 1046
1005 1047 def __init__(self, ui, header, fp):
1006 1048 super(unbundlepart, self).__init__(fp)
1007 1049 self.ui = ui
1008 1050 # unbundle state attr
1009 1051 self._headerdata = header
1010 1052 self._headeroffset = 0
1011 1053 self._initialized = False
1012 1054 self.consumed = False
1013 1055 # part data
1014 1056 self.id = None
1015 1057 self.type = None
1016 1058 self.mandatoryparams = None
1017 1059 self.advisoryparams = None
1018 1060 self.params = None
1019 1061 self.mandatorykeys = ()
1020 1062 self._payloadstream = None
1021 1063 self._readheader()
1022 1064 self._mandatory = None
1023 1065 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1024 1066 self._pos = 0
1025 1067
1026 1068 def _fromheader(self, size):
1027 1069 """return the next <size> byte from the header"""
1028 1070 offset = self._headeroffset
1029 1071 data = self._headerdata[offset:(offset + size)]
1030 1072 self._headeroffset = offset + size
1031 1073 return data
1032 1074
1033 1075 def _unpackheader(self, format):
1034 1076 """read given format from header
1035 1077
1036 1078 This automatically compute the size of the format to read."""
1037 1079 data = self._fromheader(struct.calcsize(format))
1038 1080 return _unpack(format, data)
1039 1081
1040 1082 def _initparams(self, mandatoryparams, advisoryparams):
1041 1083 """internal function to setup all logic related parameters"""
1042 1084 # make it read only to prevent people touching it by mistake.
1043 1085 self.mandatoryparams = tuple(mandatoryparams)
1044 1086 self.advisoryparams = tuple(advisoryparams)
1045 1087 # user friendly UI
1046 1088 self.params = dict(self.mandatoryparams)
1047 1089 self.params.update(dict(self.advisoryparams))
1048 1090 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1049 1091
1050 1092 def _payloadchunks(self, chunknum=0):
1051 1093 '''seek to specified chunk and start yielding data'''
1052 1094 if len(self._chunkindex) == 0:
1053 1095 assert chunknum == 0, 'Must start with chunk 0'
1054 1096 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1055 1097 else:
1056 1098 assert chunknum < len(self._chunkindex), \
1057 1099 'Unknown chunk %d' % chunknum
1058 1100 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1059 1101
1060 1102 pos = self._chunkindex[chunknum][0]
1061 1103 payloadsize = self._unpack(_fpayloadsize)[0]
1062 1104 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1063 1105 while payloadsize:
1064 1106 if payloadsize == flaginterrupt:
1065 1107 # interruption detection, the handler will now read a
1066 1108 # single part and process it.
1067 1109 interrupthandler(self.ui, self._fp)()
1068 1110 elif payloadsize < 0:
1069 1111 msg = 'negative payload chunk size: %i' % payloadsize
1070 1112 raise error.BundleValueError(msg)
1071 1113 else:
1072 1114 result = self._readexact(payloadsize)
1073 1115 chunknum += 1
1074 1116 pos += payloadsize
1075 1117 if chunknum == len(self._chunkindex):
1076 1118 self._chunkindex.append((pos,
1077 1119 super(unbundlepart, self).tell()))
1078 1120 yield result
1079 1121 payloadsize = self._unpack(_fpayloadsize)[0]
1080 1122 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1081 1123
1082 1124 def _findchunk(self, pos):
1083 1125 '''for a given payload position, return a chunk number and offset'''
1084 1126 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1085 1127 if ppos == pos:
1086 1128 return chunk, 0
1087 1129 elif ppos > pos:
1088 1130 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1089 1131 raise ValueError('Unknown chunk')
1090 1132
1091 1133 def _readheader(self):
1092 1134 """read the header and setup the object"""
1093 1135 typesize = self._unpackheader(_fparttypesize)[0]
1094 1136 self.type = self._fromheader(typesize)
1095 1137 indebug(self.ui, 'part type: "%s"' % self.type)
1096 1138 self.id = self._unpackheader(_fpartid)[0]
1097 1139 indebug(self.ui, 'part id: "%s"' % self.id)
1098 1140 # extract mandatory bit from type
1099 1141 self.mandatory = (self.type != self.type.lower())
1100 1142 self.type = self.type.lower()
1101 1143 ## reading parameters
1102 1144 # param count
1103 1145 mancount, advcount = self._unpackheader(_fpartparamcount)
1104 1146 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1105 1147 # param size
1106 1148 fparamsizes = _makefpartparamsizes(mancount + advcount)
1107 1149 paramsizes = self._unpackheader(fparamsizes)
1108 1150 # make it a list of couple again
1109 1151 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1110 1152 # split mandatory from advisory
1111 1153 mansizes = paramsizes[:mancount]
1112 1154 advsizes = paramsizes[mancount:]
1113 1155 # retrieve param value
1114 1156 manparams = []
1115 1157 for key, value in mansizes:
1116 1158 manparams.append((self._fromheader(key), self._fromheader(value)))
1117 1159 advparams = []
1118 1160 for key, value in advsizes:
1119 1161 advparams.append((self._fromheader(key), self._fromheader(value)))
1120 1162 self._initparams(manparams, advparams)
1121 1163 ## part payload
1122 1164 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1123 1165 # we read the data, tell it
1124 1166 self._initialized = True
1125 1167
1126 1168 def read(self, size=None):
1127 1169 """read payload data"""
1128 1170 if not self._initialized:
1129 1171 self._readheader()
1130 1172 if size is None:
1131 1173 data = self._payloadstream.read()
1132 1174 else:
1133 1175 data = self._payloadstream.read(size)
1134 1176 self._pos += len(data)
1135 1177 if size is None or len(data) < size:
1136 1178 if not self.consumed and self._pos:
1137 1179 self.ui.debug('bundle2-input-part: total payload size %i\n'
1138 1180 % self._pos)
1139 1181 self.consumed = True
1140 1182 return data
1141 1183
1142 1184 def tell(self):
1143 1185 return self._pos
1144 1186
1145 1187 def seek(self, offset, whence=0):
1146 1188 if whence == 0:
1147 1189 newpos = offset
1148 1190 elif whence == 1:
1149 1191 newpos = self._pos + offset
1150 1192 elif whence == 2:
1151 1193 if not self.consumed:
1152 1194 self.read()
1153 1195 newpos = self._chunkindex[-1][0] - offset
1154 1196 else:
1155 1197 raise ValueError('Unknown whence value: %r' % (whence,))
1156 1198
1157 1199 if newpos > self._chunkindex[-1][0] and not self.consumed:
1158 1200 self.read()
1159 1201 if not 0 <= newpos <= self._chunkindex[-1][0]:
1160 1202 raise ValueError('Offset out of range')
1161 1203
1162 1204 if self._pos != newpos:
1163 1205 chunk, internaloffset = self._findchunk(newpos)
1164 1206 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1165 1207 adjust = self.read(internaloffset)
1166 1208 if len(adjust) != internaloffset:
1167 1209 raise util.Abort(_('Seek failed\n'))
1168 1210 self._pos = newpos
1169 1211
1170 1212 # These are only the static capabilities.
1171 1213 # Check the 'getrepocaps' function for the rest.
1172 1214 capabilities = {'HG20': (),
1173 1215 'error': ('abort', 'unsupportedcontent', 'pushraced',
1174 1216 'pushkey'),
1175 1217 'listkeys': (),
1176 1218 'pushkey': (),
1177 1219 'digests': tuple(sorted(util.DIGESTS.keys())),
1178 1220 'remote-changegroup': ('http', 'https'),
1179 1221 'hgtagsfnodes': (),
1180 1222 }
1181 1223
1182 1224 def getrepocaps(repo, allowpushback=False):
1183 1225 """return the bundle2 capabilities for a given repo
1184 1226
1185 1227 Exists to allow extensions (like evolution) to mutate the capabilities.
1186 1228 """
1187 1229 caps = capabilities.copy()
1188 1230 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1189 1231 if obsolete.isenabled(repo, obsolete.exchangeopt):
1190 1232 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1191 1233 caps['obsmarkers'] = supportedformat
1192 1234 if allowpushback:
1193 1235 caps['pushback'] = ()
1194 1236 return caps
1195 1237
1196 1238 def bundle2caps(remote):
1197 1239 """return the bundle capabilities of a peer as dict"""
1198 1240 raw = remote.capable('bundle2')
1199 1241 if not raw and raw != '':
1200 1242 return {}
1201 1243 capsblob = urllib.unquote(remote.capable('bundle2'))
1202 1244 return decodecaps(capsblob)
1203 1245
1204 1246 def obsmarkersversion(caps):
1205 1247 """extract the list of supported obsmarkers versions from a bundle2caps dict
1206 1248 """
1207 1249 obscaps = caps.get('obsmarkers', ())
1208 1250 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1209 1251
1210 1252 @parthandler('changegroup', ('version', 'nbchanges'))
1211 1253 def handlechangegroup(op, inpart):
1212 1254 """apply a changegroup part on the repo
1213 1255
1214 1256 This is a very early implementation that will massive rework before being
1215 1257 inflicted to any end-user.
1216 1258 """
1217 1259 # Make sure we trigger a transaction creation
1218 1260 #
1219 1261 # The addchangegroup function will get a transaction object by itself, but
1220 1262 # we need to make sure we trigger the creation of a transaction object used
1221 1263 # for the whole processing scope.
1222 1264 op.gettransaction()
1223 1265 unpackerversion = inpart.params.get('version', '01')
1224 1266 # We should raise an appropriate exception here
1225 1267 unpacker = changegroup.packermap[unpackerversion][1]
1226 1268 cg = unpacker(inpart, None)
1227 1269 # the source and url passed here are overwritten by the one contained in
1228 1270 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1229 1271 nbchangesets = None
1230 1272 if 'nbchanges' in inpart.params:
1231 1273 nbchangesets = int(inpart.params.get('nbchanges'))
1232 1274 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2',
1233 1275 expectedtotal=nbchangesets)
1234 1276 op.records.add('changegroup', {'return': ret})
1235 1277 if op.reply is not None:
1236 1278 # This is definitely not the final form of this
1237 1279 # return. But one need to start somewhere.
1238 1280 part = op.reply.newpart('reply:changegroup', mandatory=False)
1239 1281 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1240 1282 part.addparam('return', '%i' % ret, mandatory=False)
1241 1283 assert not inpart.read()
1242 1284
1243 1285 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1244 1286 ['digest:%s' % k for k in util.DIGESTS.keys()])
1245 1287 @parthandler('remote-changegroup', _remotechangegroupparams)
1246 1288 def handleremotechangegroup(op, inpart):
1247 1289 """apply a bundle10 on the repo, given an url and validation information
1248 1290
1249 1291 All the information about the remote bundle to import are given as
1250 1292 parameters. The parameters include:
1251 1293 - url: the url to the bundle10.
1252 1294 - size: the bundle10 file size. It is used to validate what was
1253 1295 retrieved by the client matches the server knowledge about the bundle.
1254 1296 - digests: a space separated list of the digest types provided as
1255 1297 parameters.
1256 1298 - digest:<digest-type>: the hexadecimal representation of the digest with
1257 1299 that name. Like the size, it is used to validate what was retrieved by
1258 1300 the client matches what the server knows about the bundle.
1259 1301
1260 1302 When multiple digest types are given, all of them are checked.
1261 1303 """
1262 1304 try:
1263 1305 raw_url = inpart.params['url']
1264 1306 except KeyError:
1265 1307 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1266 1308 parsed_url = util.url(raw_url)
1267 1309 if parsed_url.scheme not in capabilities['remote-changegroup']:
1268 1310 raise util.Abort(_('remote-changegroup does not support %s urls') %
1269 1311 parsed_url.scheme)
1270 1312
1271 1313 try:
1272 1314 size = int(inpart.params['size'])
1273 1315 except ValueError:
1274 1316 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1275 1317 % 'size')
1276 1318 except KeyError:
1277 1319 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1278 1320
1279 1321 digests = {}
1280 1322 for typ in inpart.params.get('digests', '').split():
1281 1323 param = 'digest:%s' % typ
1282 1324 try:
1283 1325 value = inpart.params[param]
1284 1326 except KeyError:
1285 1327 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1286 1328 param)
1287 1329 digests[typ] = value
1288 1330
1289 1331 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1290 1332
1291 1333 # Make sure we trigger a transaction creation
1292 1334 #
1293 1335 # The addchangegroup function will get a transaction object by itself, but
1294 1336 # we need to make sure we trigger the creation of a transaction object used
1295 1337 # for the whole processing scope.
1296 1338 op.gettransaction()
1297 1339 from . import exchange
1298 1340 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1299 1341 if not isinstance(cg, changegroup.cg1unpacker):
1300 1342 raise util.Abort(_('%s: not a bundle version 1.0') %
1301 1343 util.hidepassword(raw_url))
1302 1344 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1303 1345 op.records.add('changegroup', {'return': ret})
1304 1346 if op.reply is not None:
1305 1347 # This is definitely not the final form of this
1306 1348 # return. But one need to start somewhere.
1307 1349 part = op.reply.newpart('reply:changegroup')
1308 1350 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1309 1351 part.addparam('return', '%i' % ret, mandatory=False)
1310 1352 try:
1311 1353 real_part.validate()
1312 1354 except util.Abort as e:
1313 1355 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1314 1356 (util.hidepassword(raw_url), str(e)))
1315 1357 assert not inpart.read()
1316 1358
1317 1359 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1318 1360 def handlereplychangegroup(op, inpart):
1319 1361 ret = int(inpart.params['return'])
1320 1362 replyto = int(inpart.params['in-reply-to'])
1321 1363 op.records.add('changegroup', {'return': ret}, replyto)
1322 1364
1323 1365 @parthandler('check:heads')
1324 1366 def handlecheckheads(op, inpart):
1325 1367 """check that head of the repo did not change
1326 1368
1327 1369 This is used to detect a push race when using unbundle.
1328 1370 This replaces the "heads" argument of unbundle."""
1329 1371 h = inpart.read(20)
1330 1372 heads = []
1331 1373 while len(h) == 20:
1332 1374 heads.append(h)
1333 1375 h = inpart.read(20)
1334 1376 assert not h
1335 1377 if heads != op.repo.heads():
1336 1378 raise error.PushRaced('repository changed while pushing - '
1337 1379 'please try again')
1338 1380
1339 1381 @parthandler('output')
1340 1382 def handleoutput(op, inpart):
1341 1383 """forward output captured on the server to the client"""
1342 1384 for line in inpart.read().splitlines():
1343 1385 op.ui.status(('remote: %s\n' % line))
1344 1386
1345 1387 @parthandler('replycaps')
1346 1388 def handlereplycaps(op, inpart):
1347 1389 """Notify that a reply bundle should be created
1348 1390
1349 1391 The payload contains the capabilities information for the reply"""
1350 1392 caps = decodecaps(inpart.read())
1351 1393 if op.reply is None:
1352 1394 op.reply = bundle20(op.ui, caps)
1353 1395
1354 1396 @parthandler('error:abort', ('message', 'hint'))
1355 1397 def handleerrorabort(op, inpart):
1356 1398 """Used to transmit abort error over the wire"""
1357 1399 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1358 1400
1359 1401 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1360 1402 'in-reply-to'))
1361 1403 def handleerrorpushkey(op, inpart):
1362 1404 """Used to transmit failure of a mandatory pushkey over the wire"""
1363 1405 kwargs = {}
1364 1406 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1365 1407 value = inpart.params.get(name)
1366 1408 if value is not None:
1367 1409 kwargs[name] = value
1368 1410 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1369 1411
1370 1412 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1371 1413 def handleerrorunsupportedcontent(op, inpart):
1372 1414 """Used to transmit unknown content error over the wire"""
1373 1415 kwargs = {}
1374 1416 parttype = inpart.params.get('parttype')
1375 1417 if parttype is not None:
1376 1418 kwargs['parttype'] = parttype
1377 1419 params = inpart.params.get('params')
1378 1420 if params is not None:
1379 1421 kwargs['params'] = params.split('\0')
1380 1422
1381 1423 raise error.BundleUnknownFeatureError(**kwargs)
1382 1424
1383 1425 @parthandler('error:pushraced', ('message',))
1384 1426 def handleerrorpushraced(op, inpart):
1385 1427 """Used to transmit push race error over the wire"""
1386 1428 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1387 1429
1388 1430 @parthandler('listkeys', ('namespace',))
1389 1431 def handlelistkeys(op, inpart):
1390 1432 """retrieve pushkey namespace content stored in a bundle2"""
1391 1433 namespace = inpart.params['namespace']
1392 1434 r = pushkey.decodekeys(inpart.read())
1393 1435 op.records.add('listkeys', (namespace, r))
1394 1436
1395 1437 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1396 1438 def handlepushkey(op, inpart):
1397 1439 """process a pushkey request"""
1398 1440 dec = pushkey.decode
1399 1441 namespace = dec(inpart.params['namespace'])
1400 1442 key = dec(inpart.params['key'])
1401 1443 old = dec(inpart.params['old'])
1402 1444 new = dec(inpart.params['new'])
1403 1445 ret = op.repo.pushkey(namespace, key, old, new)
1404 1446 record = {'namespace': namespace,
1405 1447 'key': key,
1406 1448 'old': old,
1407 1449 'new': new}
1408 1450 op.records.add('pushkey', record)
1409 1451 if op.reply is not None:
1410 1452 rpart = op.reply.newpart('reply:pushkey')
1411 1453 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1412 1454 rpart.addparam('return', '%i' % ret, mandatory=False)
1413 1455 if inpart.mandatory and not ret:
1414 1456 kwargs = {}
1415 1457 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1416 1458 if key in inpart.params:
1417 1459 kwargs[key] = inpart.params[key]
1418 1460 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1419 1461
1420 1462 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1421 1463 def handlepushkeyreply(op, inpart):
1422 1464 """retrieve the result of a pushkey request"""
1423 1465 ret = int(inpart.params['return'])
1424 1466 partid = int(inpart.params['in-reply-to'])
1425 1467 op.records.add('pushkey', {'return': ret}, partid)
1426 1468
1427 1469 @parthandler('obsmarkers')
1428 1470 def handleobsmarker(op, inpart):
1429 1471 """add a stream of obsmarkers to the repo"""
1430 1472 tr = op.gettransaction()
1431 1473 markerdata = inpart.read()
1432 1474 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1433 1475 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1434 1476 % len(markerdata))
1435 1477 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1436 1478 if new:
1437 1479 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1438 1480 op.records.add('obsmarkers', {'new': new})
1439 1481 if op.reply is not None:
1440 1482 rpart = op.reply.newpart('reply:obsmarkers')
1441 1483 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1442 1484 rpart.addparam('new', '%i' % new, mandatory=False)
1443 1485
1444 1486
1445 1487 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1446 1488 def handleobsmarkerreply(op, inpart):
1447 1489 """retrieve the result of a pushkey request"""
1448 1490 ret = int(inpart.params['new'])
1449 1491 partid = int(inpart.params['in-reply-to'])
1450 1492 op.records.add('obsmarkers', {'new': ret}, partid)
1451 1493
1452 1494 @parthandler('hgtagsfnodes')
1453 1495 def handlehgtagsfnodes(op, inpart):
1454 1496 """Applies .hgtags fnodes cache entries to the local repo.
1455 1497
1456 1498 Payload is pairs of 20 byte changeset nodes and filenodes.
1457 1499 """
1458 1500 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1459 1501
1460 1502 count = 0
1461 1503 while True:
1462 1504 node = inpart.read(20)
1463 1505 fnode = inpart.read(20)
1464 1506 if len(node) < 20 or len(fnode) < 20:
1465 1507 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1466 1508 break
1467 1509 cache.setfnode(node, fnode)
1468 1510 count += 1
1469 1511
1470 1512 cache.write()
1471 1513 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now