##// END OF EJS Templates
bundle2: split parameter retrieval and processing...
Pierre-Yves David -
r26541:d40029b4 default
parent child Browse files
Show More
@@ -1,1463 +1,1471 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155 import urllib
156 156
157 157 from .i18n import _
158 158 from . import (
159 159 changegroup,
160 160 error,
161 161 obsolete,
162 162 pushkey,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 _pack = struct.pack
169 169 _unpack = struct.unpack
170 170
171 171 _fstreamparamsize = '>i'
172 172 _fpartheadersize = '>i'
173 173 _fparttypesize = '>B'
174 174 _fpartid = '>I'
175 175 _fpayloadsize = '>i'
176 176 _fpartparamcount = '>BB'
177 177
178 178 preferedchunksize = 4096
179 179
180 180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181 181
182 182 def outdebug(ui, message):
183 183 """debug regarding output stream (bundling)"""
184 184 if ui.configbool('devel', 'bundle2.debug', False):
185 185 ui.debug('bundle2-output: %s\n' % message)
186 186
187 187 def indebug(ui, message):
188 188 """debug on input stream (unbundling)"""
189 189 if ui.configbool('devel', 'bundle2.debug', False):
190 190 ui.debug('bundle2-input: %s\n' % message)
191 191
192 192 def validateparttype(parttype):
193 193 """raise ValueError if a parttype contains invalid character"""
194 194 if _parttypeforbidden.search(parttype):
195 195 raise ValueError(parttype)
196 196
197 197 def _makefpartparamsizes(nbparams):
198 198 """return a struct format to read part parameter sizes
199 199
200 200 The number parameters is variable so we need to build that format
201 201 dynamically.
202 202 """
203 203 return '>'+('BB'*nbparams)
204 204
205 205 parthandlermapping = {}
206 206
207 207 def parthandler(parttype, params=()):
208 208 """decorator that register a function as a bundle2 part handler
209 209
210 210 eg::
211 211
212 212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 213 def myparttypehandler(...):
214 214 '''process a part of type "my part".'''
215 215 ...
216 216 """
217 217 validateparttype(parttype)
218 218 def _decorator(func):
219 219 lparttype = parttype.lower() # enforce lower case matching.
220 220 assert lparttype not in parthandlermapping
221 221 parthandlermapping[lparttype] = func
222 222 func.params = frozenset(params)
223 223 return func
224 224 return _decorator
225 225
226 226 class unbundlerecords(object):
227 227 """keep record of what happens during and unbundle
228 228
229 229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 230 category of record and obj is an arbitrary object.
231 231
232 232 `records['cat']` will return all entries of this category 'cat'.
233 233
234 234 Iterating on the object itself will yield `('category', obj)` tuples
235 235 for all entries.
236 236
237 237 All iterations happens in chronological order.
238 238 """
239 239
240 240 def __init__(self):
241 241 self._categories = {}
242 242 self._sequences = []
243 243 self._replies = {}
244 244
245 245 def add(self, category, entry, inreplyto=None):
246 246 """add a new record of a given category.
247 247
248 248 The entry can then be retrieved in the list returned by
249 249 self['category']."""
250 250 self._categories.setdefault(category, []).append(entry)
251 251 self._sequences.append((category, entry))
252 252 if inreplyto is not None:
253 253 self.getreplies(inreplyto).add(category, entry)
254 254
255 255 def getreplies(self, partid):
256 256 """get the records that are replies to a specific part"""
257 257 return self._replies.setdefault(partid, unbundlerecords())
258 258
259 259 def __getitem__(self, cat):
260 260 return tuple(self._categories.get(cat, ()))
261 261
262 262 def __iter__(self):
263 263 return iter(self._sequences)
264 264
265 265 def __len__(self):
266 266 return len(self._sequences)
267 267
268 268 def __nonzero__(self):
269 269 return bool(self._sequences)
270 270
271 271 class bundleoperation(object):
272 272 """an object that represents a single bundling process
273 273
274 274 Its purpose is to carry unbundle-related objects and states.
275 275
276 276 A new object should be created at the beginning of each bundle processing.
277 277 The object is to be returned by the processing function.
278 278
279 279 The object has very little content now it will ultimately contain:
280 280 * an access to the repo the bundle is applied to,
281 281 * a ui object,
282 282 * a way to retrieve a transaction to add changes to the repo,
283 283 * a way to record the result of processing each part,
284 284 * a way to construct a bundle response when applicable.
285 285 """
286 286
287 287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 288 self.repo = repo
289 289 self.ui = repo.ui
290 290 self.records = unbundlerecords()
291 291 self.gettransaction = transactiongetter
292 292 self.reply = None
293 293 self.captureoutput = captureoutput
294 294
295 295 class TransactionUnavailable(RuntimeError):
296 296 pass
297 297
298 298 def _notransaction():
299 299 """default method to get a transaction while processing a bundle
300 300
301 301 Raise an exception to highlight the fact that no transaction was expected
302 302 to be created"""
303 303 raise TransactionUnavailable()
304 304
305 305 def processbundle(repo, unbundler, transactiongetter=None, op=None):
306 306 """This function process a bundle, apply effect to/from a repo
307 307
308 308 It iterates over each part then searches for and uses the proper handling
309 309 code to process the part. Parts are processed in order.
310 310
311 311 This is very early version of this function that will be strongly reworked
312 312 before final usage.
313 313
314 314 Unknown Mandatory part will abort the process.
315 315
316 316 It is temporarily possible to provide a prebuilt bundleoperation to the
317 317 function. This is used to ensure output is properly propagated in case of
318 318 an error during the unbundling. This output capturing part will likely be
319 319 reworked and this ability will probably go away in the process.
320 320 """
321 321 if op is None:
322 322 if transactiongetter is None:
323 323 transactiongetter = _notransaction
324 324 op = bundleoperation(repo, transactiongetter)
325 325 # todo:
326 326 # - replace this is a init function soon.
327 327 # - exception catching
328 328 unbundler.params
329 329 if repo.ui.debugflag:
330 330 msg = ['bundle2-input-bundle:']
331 331 if unbundler.params:
332 332 msg.append(' %i params')
333 333 if op.gettransaction is None:
334 334 msg.append(' no-transaction')
335 335 else:
336 336 msg.append(' with-transaction')
337 337 msg.append('\n')
338 338 repo.ui.debug(''.join(msg))
339 339 iterparts = enumerate(unbundler.iterparts())
340 340 part = None
341 341 nbpart = 0
342 342 try:
343 343 for nbpart, part in iterparts:
344 344 _processpart(op, part)
345 345 except BaseException as exc:
346 346 for nbpart, part in iterparts:
347 347 # consume the bundle content
348 348 part.seek(0, 2)
349 349 # Small hack to let caller code distinguish exceptions from bundle2
350 350 # processing from processing the old format. This is mostly
351 351 # needed to handle different return codes to unbundle according to the
352 352 # type of bundle. We should probably clean up or drop this return code
353 353 # craziness in a future version.
354 354 exc.duringunbundle2 = True
355 355 salvaged = []
356 356 replycaps = None
357 357 if op.reply is not None:
358 358 salvaged = op.reply.salvageoutput()
359 359 replycaps = op.reply.capabilities
360 360 exc._replycaps = replycaps
361 361 exc._bundle2salvagedoutput = salvaged
362 362 raise
363 363 finally:
364 364 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
365 365
366 366 return op
367 367
368 368 def _processpart(op, part):
369 369 """process a single part from a bundle
370 370
371 371 The part is guaranteed to have been fully consumed when the function exits
372 372 (even if an exception is raised)."""
373 373 status = 'unknown' # used by debug output
374 374 try:
375 375 try:
376 376 handler = parthandlermapping.get(part.type)
377 377 if handler is None:
378 378 status = 'unsupported-type'
379 379 raise error.BundleUnknownFeatureError(parttype=part.type)
380 380 indebug(op.ui, 'found a handler for part %r' % part.type)
381 381 unknownparams = part.mandatorykeys - handler.params
382 382 if unknownparams:
383 383 unknownparams = list(unknownparams)
384 384 unknownparams.sort()
385 385 status = 'unsupported-params (%s)' % unknownparams
386 386 raise error.BundleUnknownFeatureError(parttype=part.type,
387 387 params=unknownparams)
388 388 status = 'supported'
389 389 except error.BundleUnknownFeatureError as exc:
390 390 if part.mandatory: # mandatory parts
391 391 raise
392 392 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
393 393 return # skip to part processing
394 394 finally:
395 395 if op.ui.debugflag:
396 396 msg = ['bundle2-input-part: "%s"' % part.type]
397 397 if not part.mandatory:
398 398 msg.append(' (advisory)')
399 399 nbmp = len(part.mandatorykeys)
400 400 nbap = len(part.params) - nbmp
401 401 if nbmp or nbap:
402 402 msg.append(' (params:')
403 403 if nbmp:
404 404 msg.append(' %i mandatory' % nbmp)
405 405 if nbap:
406 406 msg.append(' %i advisory' % nbmp)
407 407 msg.append(')')
408 408 msg.append(' %s\n' % status)
409 409 op.ui.debug(''.join(msg))
410 410
411 411 # handler is called outside the above try block so that we don't
412 412 # risk catching KeyErrors from anything other than the
413 413 # parthandlermapping lookup (any KeyError raised by handler()
414 414 # itself represents a defect of a different variety).
415 415 output = None
416 416 if op.captureoutput and op.reply is not None:
417 417 op.ui.pushbuffer(error=True, subproc=True)
418 418 output = ''
419 419 try:
420 420 handler(op, part)
421 421 finally:
422 422 if output is not None:
423 423 output = op.ui.popbuffer()
424 424 if output:
425 425 outpart = op.reply.newpart('output', data=output,
426 426 mandatory=False)
427 427 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
428 428 finally:
429 429 # consume the part content to not corrupt the stream.
430 430 part.seek(0, 2)
431 431
432 432
433 433 def decodecaps(blob):
434 434 """decode a bundle2 caps bytes blob into a dictionary
435 435
436 436 The blob is a list of capabilities (one per line)
437 437 Capabilities may have values using a line of the form::
438 438
439 439 capability=value1,value2,value3
440 440
441 441 The values are always a list."""
442 442 caps = {}
443 443 for line in blob.splitlines():
444 444 if not line:
445 445 continue
446 446 if '=' not in line:
447 447 key, vals = line, ()
448 448 else:
449 449 key, vals = line.split('=', 1)
450 450 vals = vals.split(',')
451 451 key = urllib.unquote(key)
452 452 vals = [urllib.unquote(v) for v in vals]
453 453 caps[key] = vals
454 454 return caps
455 455
456 456 def encodecaps(caps):
457 457 """encode a bundle2 caps dictionary into a bytes blob"""
458 458 chunks = []
459 459 for ca in sorted(caps):
460 460 vals = caps[ca]
461 461 ca = urllib.quote(ca)
462 462 vals = [urllib.quote(v) for v in vals]
463 463 if vals:
464 464 ca = "%s=%s" % (ca, ','.join(vals))
465 465 chunks.append(ca)
466 466 return '\n'.join(chunks)
467 467
468 468 class bundle20(object):
469 469 """represent an outgoing bundle2 container
470 470
471 471 Use the `addparam` method to add stream level parameter. and `newpart` to
472 472 populate it. Then call `getchunks` to retrieve all the binary chunks of
473 473 data that compose the bundle2 container."""
474 474
475 475 _magicstring = 'HG20'
476 476
477 477 def __init__(self, ui, capabilities=()):
478 478 self.ui = ui
479 479 self._params = []
480 480 self._parts = []
481 481 self.capabilities = dict(capabilities)
482 482 self._compressor = util.compressors[None]()
483 483
484 484 def setcompression(self, alg):
485 485 """setup core part compression to <alg>"""
486 486 if alg is None:
487 487 return
488 488 assert not any(n.lower() == 'Compression' for n, v in self._params)
489 489 self.addparam('Compression', alg)
490 490 self._compressor = util.compressors[alg]()
491 491
492 492 @property
493 493 def nbparts(self):
494 494 """total number of parts added to the bundler"""
495 495 return len(self._parts)
496 496
497 497 # methods used to defines the bundle2 content
498 498 def addparam(self, name, value=None):
499 499 """add a stream level parameter"""
500 500 if not name:
501 501 raise ValueError('empty parameter name')
502 502 if name[0] not in string.letters:
503 503 raise ValueError('non letter first character: %r' % name)
504 504 self._params.append((name, value))
505 505
506 506 def addpart(self, part):
507 507 """add a new part to the bundle2 container
508 508
509 509 Parts contains the actual applicative payload."""
510 510 assert part.id is None
511 511 part.id = len(self._parts) # very cheap counter
512 512 self._parts.append(part)
513 513
514 514 def newpart(self, typeid, *args, **kwargs):
515 515 """create a new part and add it to the containers
516 516
517 517 As the part is directly added to the containers. For now, this means
518 518 that any failure to properly initialize the part after calling
519 519 ``newpart`` should result in a failure of the whole bundling process.
520 520
521 521 You can still fall back to manually create and add if you need better
522 522 control."""
523 523 part = bundlepart(typeid, *args, **kwargs)
524 524 self.addpart(part)
525 525 return part
526 526
527 527 # methods used to generate the bundle2 stream
528 528 def getchunks(self):
529 529 if self.ui.debugflag:
530 530 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
531 531 if self._params:
532 532 msg.append(' (%i params)' % len(self._params))
533 533 msg.append(' %i parts total\n' % len(self._parts))
534 534 self.ui.debug(''.join(msg))
535 535 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
536 536 yield self._magicstring
537 537 param = self._paramchunk()
538 538 outdebug(self.ui, 'bundle parameter: %s' % param)
539 539 yield _pack(_fstreamparamsize, len(param))
540 540 if param:
541 541 yield param
542 542 # starting compression
543 543 for chunk in self._getcorechunk():
544 544 yield self._compressor.compress(chunk)
545 545 yield self._compressor.flush()
546 546
547 547 def _paramchunk(self):
548 548 """return a encoded version of all stream parameters"""
549 549 blocks = []
550 550 for par, value in self._params:
551 551 par = urllib.quote(par)
552 552 if value is not None:
553 553 value = urllib.quote(value)
554 554 par = '%s=%s' % (par, value)
555 555 blocks.append(par)
556 556 return ' '.join(blocks)
557 557
558 558 def _getcorechunk(self):
559 559 """yield chunk for the core part of the bundle
560 560
561 561 (all but headers and parameters)"""
562 562 outdebug(self.ui, 'start of parts')
563 563 for part in self._parts:
564 564 outdebug(self.ui, 'bundle part: "%s"' % part.type)
565 565 for chunk in part.getchunks(ui=self.ui):
566 566 yield chunk
567 567 outdebug(self.ui, 'end of bundle')
568 568 yield _pack(_fpartheadersize, 0)
569 569
570 570
571 571 def salvageoutput(self):
572 572 """return a list with a copy of all output parts in the bundle
573 573
574 574 This is meant to be used during error handling to make sure we preserve
575 575 server output"""
576 576 salvaged = []
577 577 for part in self._parts:
578 578 if part.type.startswith('output'):
579 579 salvaged.append(part.copy())
580 580 return salvaged
581 581
582 582
583 583 class unpackermixin(object):
584 584 """A mixin to extract bytes and struct data from a stream"""
585 585
586 586 def __init__(self, fp):
587 587 self._fp = fp
588 588 self._seekable = (util.safehasattr(fp, 'seek') and
589 589 util.safehasattr(fp, 'tell'))
590 590
591 591 def _unpack(self, format):
592 592 """unpack this struct format from the stream"""
593 593 data = self._readexact(struct.calcsize(format))
594 594 return _unpack(format, data)
595 595
596 596 def _readexact(self, size):
597 597 """read exactly <size> bytes from the stream"""
598 598 return changegroup.readexactly(self._fp, size)
599 599
600 600 def seek(self, offset, whence=0):
601 601 """move the underlying file pointer"""
602 602 if self._seekable:
603 603 return self._fp.seek(offset, whence)
604 604 else:
605 605 raise NotImplementedError(_('File pointer is not seekable'))
606 606
607 607 def tell(self):
608 608 """return the file offset, or None if file is not seekable"""
609 609 if self._seekable:
610 610 try:
611 611 return self._fp.tell()
612 612 except IOError as e:
613 613 if e.errno == errno.ESPIPE:
614 614 self._seekable = False
615 615 else:
616 616 raise
617 617 return None
618 618
619 619 def close(self):
620 620 """close underlying file"""
621 621 if util.safehasattr(self._fp, 'close'):
622 622 return self._fp.close()
623 623
624 624 def getunbundler(ui, fp, magicstring=None):
625 625 """return a valid unbundler object for a given magicstring"""
626 626 if magicstring is None:
627 627 magicstring = changegroup.readexactly(fp, 4)
628 628 magic, version = magicstring[0:2], magicstring[2:4]
629 629 if magic != 'HG':
630 630 raise util.Abort(_('not a Mercurial bundle'))
631 631 unbundlerclass = formatmap.get(version)
632 632 if unbundlerclass is None:
633 633 raise util.Abort(_('unknown bundle version %s') % version)
634 634 unbundler = unbundlerclass(ui, fp)
635 635 indebug(ui, 'start processing of %s stream' % magicstring)
636 636 return unbundler
637 637
638 638 class unbundle20(unpackermixin):
639 639 """interpret a bundle2 stream
640 640
641 641 This class is fed with a binary stream and yields parts through its
642 642 `iterparts` methods."""
643 643
644 644 def __init__(self, ui, fp):
645 645 """If header is specified, we do not read it out of the stream."""
646 646 self.ui = ui
647 647 self._decompressor = util.decompressors[None]
648 648 super(unbundle20, self).__init__(fp)
649 649
650 650 @util.propertycache
651 651 def params(self):
652 652 """dictionary of stream level parameters"""
653 653 indebug(self.ui, 'reading bundle2 stream parameters')
654 654 params = {}
655 655 paramssize = self._unpack(_fstreamparamsize)[0]
656 656 if paramssize < 0:
657 657 raise error.BundleValueError('negative bundle param size: %i'
658 658 % paramssize)
659 659 if paramssize:
660 for p in self._readexact(paramssize).split(' '):
661 p = p.split('=', 1)
662 p = [urllib.unquote(i) for i in p]
663 if len(p) < 2:
664 p.append(None)
665 self._processparam(*p)
666 params[p[0]] = p[1]
660 params = self._readexact(paramssize)
661 params = self._processallparams(params)
667 662 return params
668 663
664 def _processallparams(self, paramsblock):
665 """"""
666 params = {}
667 for p in paramsblock.split(' '):
668 p = p.split('=', 1)
669 p = [urllib.unquote(i) for i in p]
670 if len(p) < 2:
671 p.append(None)
672 self._processparam(*p)
673 params[p[0]] = p[1]
674 return params
675
676
669 677 def _processparam(self, name, value):
670 678 """process a parameter, applying its effect if needed
671 679
672 680 Parameter starting with a lower case letter are advisory and will be
673 681 ignored when unknown. Those starting with an upper case letter are
674 682 mandatory and will this function will raise a KeyError when unknown.
675 683
676 684 Note: no option are currently supported. Any input will be either
677 685 ignored or failing.
678 686 """
679 687 if not name:
680 688 raise ValueError('empty parameter name')
681 689 if name[0] not in string.letters:
682 690 raise ValueError('non letter first character: %r' % name)
683 691 try:
684 692 handler = b2streamparamsmap[name.lower()]
685 693 except KeyError:
686 694 if name[0].islower():
687 695 indebug(self.ui, "ignoring unknown parameter %r" % name)
688 696 else:
689 697 raise error.BundleUnknownFeatureError(params=(name,))
690 698 else:
691 699 handler(self, name, value)
692 700
693 701 def iterparts(self):
694 702 """yield all parts contained in the stream"""
695 703 # make sure param have been loaded
696 704 self.params
697 705 # From there, payload need to be decompressed
698 706 self._fp = self._decompressor(self._fp)
699 707 indebug(self.ui, 'start extraction of bundle2 parts')
700 708 headerblock = self._readpartheader()
701 709 while headerblock is not None:
702 710 part = unbundlepart(self.ui, headerblock, self._fp)
703 711 yield part
704 712 part.seek(0, 2)
705 713 headerblock = self._readpartheader()
706 714 indebug(self.ui, 'end of bundle2 stream')
707 715
708 716 def _readpartheader(self):
709 717 """reads a part header size and return the bytes blob
710 718
711 719 returns None if empty"""
712 720 headersize = self._unpack(_fpartheadersize)[0]
713 721 if headersize < 0:
714 722 raise error.BundleValueError('negative part header size: %i'
715 723 % headersize)
716 724 indebug(self.ui, 'part header size: %i' % headersize)
717 725 if headersize:
718 726 return self._readexact(headersize)
719 727 return None
720 728
721 729 def compressed(self):
722 730 return False
723 731
724 732 formatmap = {'20': unbundle20}
725 733
726 734 b2streamparamsmap = {}
727 735
728 736 def b2streamparamhandler(name):
729 737 """register a handler for a stream level parameter"""
730 738 def decorator(func):
731 739 assert name not in formatmap
732 740 b2streamparamsmap[name] = func
733 741 return func
734 742 return decorator
735 743
736 744 @b2streamparamhandler('compression')
737 745 def processcompression(unbundler, param, value):
738 746 """read compression parameter and install payload decompression"""
739 747 if value not in util.decompressors:
740 748 raise error.BundleUnknownFeatureError(params=(param,),
741 749 values=(value,))
742 750 unbundler._decompressor = util.decompressors[value]
743 751
744 752 class bundlepart(object):
745 753 """A bundle2 part contains application level payload
746 754
747 755 The part `type` is used to route the part to the application level
748 756 handler.
749 757
750 758 The part payload is contained in ``part.data``. It could be raw bytes or a
751 759 generator of byte chunks.
752 760
753 761 You can add parameters to the part using the ``addparam`` method.
754 762 Parameters can be either mandatory (default) or advisory. Remote side
755 763 should be able to safely ignore the advisory ones.
756 764
757 765 Both data and parameters cannot be modified after the generation has begun.
758 766 """
759 767
760 768 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
761 769 data='', mandatory=True):
762 770 validateparttype(parttype)
763 771 self.id = None
764 772 self.type = parttype
765 773 self._data = data
766 774 self._mandatoryparams = list(mandatoryparams)
767 775 self._advisoryparams = list(advisoryparams)
768 776 # checking for duplicated entries
769 777 self._seenparams = set()
770 778 for pname, __ in self._mandatoryparams + self._advisoryparams:
771 779 if pname in self._seenparams:
772 780 raise RuntimeError('duplicated params: %s' % pname)
773 781 self._seenparams.add(pname)
774 782 # status of the part's generation:
775 783 # - None: not started,
776 784 # - False: currently generated,
777 785 # - True: generation done.
778 786 self._generated = None
779 787 self.mandatory = mandatory
780 788
781 789 def copy(self):
782 790 """return a copy of the part
783 791
784 792 The new part have the very same content but no partid assigned yet.
785 793 Parts with generated data cannot be copied."""
786 794 assert not util.safehasattr(self.data, 'next')
787 795 return self.__class__(self.type, self._mandatoryparams,
788 796 self._advisoryparams, self._data, self.mandatory)
789 797
790 798 # methods used to defines the part content
791 799 def __setdata(self, data):
792 800 if self._generated is not None:
793 801 raise error.ReadOnlyPartError('part is being generated')
794 802 self._data = data
795 803 def __getdata(self):
796 804 return self._data
797 805 data = property(__getdata, __setdata)
798 806
799 807 @property
800 808 def mandatoryparams(self):
801 809 # make it an immutable tuple to force people through ``addparam``
802 810 return tuple(self._mandatoryparams)
803 811
804 812 @property
805 813 def advisoryparams(self):
806 814 # make it an immutable tuple to force people through ``addparam``
807 815 return tuple(self._advisoryparams)
808 816
809 817 def addparam(self, name, value='', mandatory=True):
810 818 if self._generated is not None:
811 819 raise error.ReadOnlyPartError('part is being generated')
812 820 if name in self._seenparams:
813 821 raise ValueError('duplicated params: %s' % name)
814 822 self._seenparams.add(name)
815 823 params = self._advisoryparams
816 824 if mandatory:
817 825 params = self._mandatoryparams
818 826 params.append((name, value))
819 827
820 828 # methods used to generates the bundle2 stream
821 829 def getchunks(self, ui):
822 830 if self._generated is not None:
823 831 raise RuntimeError('part can only be consumed once')
824 832 self._generated = False
825 833
826 834 if ui.debugflag:
827 835 msg = ['bundle2-output-part: "%s"' % self.type]
828 836 if not self.mandatory:
829 837 msg.append(' (advisory)')
830 838 nbmp = len(self.mandatoryparams)
831 839 nbap = len(self.advisoryparams)
832 840 if nbmp or nbap:
833 841 msg.append(' (params:')
834 842 if nbmp:
835 843 msg.append(' %i mandatory' % nbmp)
836 844 if nbap:
837 845 msg.append(' %i advisory' % nbmp)
838 846 msg.append(')')
839 847 if not self.data:
840 848 msg.append(' empty payload')
841 849 elif util.safehasattr(self.data, 'next'):
842 850 msg.append(' streamed payload')
843 851 else:
844 852 msg.append(' %i bytes payload' % len(self.data))
845 853 msg.append('\n')
846 854 ui.debug(''.join(msg))
847 855
848 856 #### header
849 857 if self.mandatory:
850 858 parttype = self.type.upper()
851 859 else:
852 860 parttype = self.type.lower()
853 861 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
854 862 ## parttype
855 863 header = [_pack(_fparttypesize, len(parttype)),
856 864 parttype, _pack(_fpartid, self.id),
857 865 ]
858 866 ## parameters
859 867 # count
860 868 manpar = self.mandatoryparams
861 869 advpar = self.advisoryparams
862 870 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
863 871 # size
864 872 parsizes = []
865 873 for key, value in manpar:
866 874 parsizes.append(len(key))
867 875 parsizes.append(len(value))
868 876 for key, value in advpar:
869 877 parsizes.append(len(key))
870 878 parsizes.append(len(value))
871 879 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
872 880 header.append(paramsizes)
873 881 # key, value
874 882 for key, value in manpar:
875 883 header.append(key)
876 884 header.append(value)
877 885 for key, value in advpar:
878 886 header.append(key)
879 887 header.append(value)
880 888 ## finalize header
881 889 headerchunk = ''.join(header)
882 890 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
883 891 yield _pack(_fpartheadersize, len(headerchunk))
884 892 yield headerchunk
885 893 ## payload
886 894 try:
887 895 for chunk in self._payloadchunks():
888 896 outdebug(ui, 'payload chunk size: %i' % len(chunk))
889 897 yield _pack(_fpayloadsize, len(chunk))
890 898 yield chunk
891 899 except GeneratorExit:
892 900 # GeneratorExit means that nobody is listening for our
893 901 # results anyway, so just bail quickly rather than trying
894 902 # to produce an error part.
895 903 ui.debug('bundle2-generatorexit\n')
896 904 raise
897 905 except BaseException as exc:
898 906 # backup exception data for later
899 907 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
900 908 % exc)
901 909 exc_info = sys.exc_info()
902 910 msg = 'unexpected error: %s' % exc
903 911 interpart = bundlepart('error:abort', [('message', msg)],
904 912 mandatory=False)
905 913 interpart.id = 0
906 914 yield _pack(_fpayloadsize, -1)
907 915 for chunk in interpart.getchunks(ui=ui):
908 916 yield chunk
909 917 outdebug(ui, 'closing payload chunk')
910 918 # abort current part payload
911 919 yield _pack(_fpayloadsize, 0)
912 920 raise exc_info[0], exc_info[1], exc_info[2]
913 921 # end of payload
914 922 outdebug(ui, 'closing payload chunk')
915 923 yield _pack(_fpayloadsize, 0)
916 924 self._generated = True
917 925
918 926 def _payloadchunks(self):
919 927 """yield chunks of a the part payload
920 928
921 929 Exists to handle the different methods to provide data to a part."""
922 930 # we only support fixed size data now.
923 931 # This will be improved in the future.
924 932 if util.safehasattr(self.data, 'next'):
925 933 buff = util.chunkbuffer(self.data)
926 934 chunk = buff.read(preferedchunksize)
927 935 while chunk:
928 936 yield chunk
929 937 chunk = buff.read(preferedchunksize)
930 938 elif len(self.data):
931 939 yield self.data
932 940
933 941
934 942 flaginterrupt = -1
935 943
936 944 class interrupthandler(unpackermixin):
937 945 """read one part and process it with restricted capability
938 946
939 947 This allows to transmit exception raised on the producer size during part
940 948 iteration while the consumer is reading a part.
941 949
942 950 Part processed in this manner only have access to a ui object,"""
943 951
944 952 def __init__(self, ui, fp):
945 953 super(interrupthandler, self).__init__(fp)
946 954 self.ui = ui
947 955
948 956 def _readpartheader(self):
949 957 """reads a part header size and return the bytes blob
950 958
951 959 returns None if empty"""
952 960 headersize = self._unpack(_fpartheadersize)[0]
953 961 if headersize < 0:
954 962 raise error.BundleValueError('negative part header size: %i'
955 963 % headersize)
956 964 indebug(self.ui, 'part header size: %i\n' % headersize)
957 965 if headersize:
958 966 return self._readexact(headersize)
959 967 return None
960 968
961 969 def __call__(self):
962 970
963 971 self.ui.debug('bundle2-input-stream-interrupt:'
964 972 ' opening out of band context\n')
965 973 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
966 974 headerblock = self._readpartheader()
967 975 if headerblock is None:
968 976 indebug(self.ui, 'no part found during interruption.')
969 977 return
970 978 part = unbundlepart(self.ui, headerblock, self._fp)
971 979 op = interruptoperation(self.ui)
972 980 _processpart(op, part)
973 981 self.ui.debug('bundle2-input-stream-interrupt:'
974 982 ' closing out of band context\n')
975 983
976 984 class interruptoperation(object):
977 985 """A limited operation to be use by part handler during interruption
978 986
979 987 It only have access to an ui object.
980 988 """
981 989
982 990 def __init__(self, ui):
983 991 self.ui = ui
984 992 self.reply = None
985 993 self.captureoutput = False
986 994
987 995 @property
988 996 def repo(self):
989 997 raise RuntimeError('no repo access from stream interruption')
990 998
991 999 def gettransaction(self):
992 1000 raise TransactionUnavailable('no repo access from stream interruption')
993 1001
994 1002 class unbundlepart(unpackermixin):
995 1003 """a bundle part read from a bundle"""
996 1004
997 1005 def __init__(self, ui, header, fp):
998 1006 super(unbundlepart, self).__init__(fp)
999 1007 self.ui = ui
1000 1008 # unbundle state attr
1001 1009 self._headerdata = header
1002 1010 self._headeroffset = 0
1003 1011 self._initialized = False
1004 1012 self.consumed = False
1005 1013 # part data
1006 1014 self.id = None
1007 1015 self.type = None
1008 1016 self.mandatoryparams = None
1009 1017 self.advisoryparams = None
1010 1018 self.params = None
1011 1019 self.mandatorykeys = ()
1012 1020 self._payloadstream = None
1013 1021 self._readheader()
1014 1022 self._mandatory = None
1015 1023 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1016 1024 self._pos = 0
1017 1025
1018 1026 def _fromheader(self, size):
1019 1027 """return the next <size> byte from the header"""
1020 1028 offset = self._headeroffset
1021 1029 data = self._headerdata[offset:(offset + size)]
1022 1030 self._headeroffset = offset + size
1023 1031 return data
1024 1032
1025 1033 def _unpackheader(self, format):
1026 1034 """read given format from header
1027 1035
1028 1036 This automatically compute the size of the format to read."""
1029 1037 data = self._fromheader(struct.calcsize(format))
1030 1038 return _unpack(format, data)
1031 1039
1032 1040 def _initparams(self, mandatoryparams, advisoryparams):
1033 1041 """internal function to setup all logic related parameters"""
1034 1042 # make it read only to prevent people touching it by mistake.
1035 1043 self.mandatoryparams = tuple(mandatoryparams)
1036 1044 self.advisoryparams = tuple(advisoryparams)
1037 1045 # user friendly UI
1038 1046 self.params = dict(self.mandatoryparams)
1039 1047 self.params.update(dict(self.advisoryparams))
1040 1048 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1041 1049
1042 1050 def _payloadchunks(self, chunknum=0):
1043 1051 '''seek to specified chunk and start yielding data'''
1044 1052 if len(self._chunkindex) == 0:
1045 1053 assert chunknum == 0, 'Must start with chunk 0'
1046 1054 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1047 1055 else:
1048 1056 assert chunknum < len(self._chunkindex), \
1049 1057 'Unknown chunk %d' % chunknum
1050 1058 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1051 1059
1052 1060 pos = self._chunkindex[chunknum][0]
1053 1061 payloadsize = self._unpack(_fpayloadsize)[0]
1054 1062 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1055 1063 while payloadsize:
1056 1064 if payloadsize == flaginterrupt:
1057 1065 # interruption detection, the handler will now read a
1058 1066 # single part and process it.
1059 1067 interrupthandler(self.ui, self._fp)()
1060 1068 elif payloadsize < 0:
1061 1069 msg = 'negative payload chunk size: %i' % payloadsize
1062 1070 raise error.BundleValueError(msg)
1063 1071 else:
1064 1072 result = self._readexact(payloadsize)
1065 1073 chunknum += 1
1066 1074 pos += payloadsize
1067 1075 if chunknum == len(self._chunkindex):
1068 1076 self._chunkindex.append((pos,
1069 1077 super(unbundlepart, self).tell()))
1070 1078 yield result
1071 1079 payloadsize = self._unpack(_fpayloadsize)[0]
1072 1080 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1073 1081
1074 1082 def _findchunk(self, pos):
1075 1083 '''for a given payload position, return a chunk number and offset'''
1076 1084 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1077 1085 if ppos == pos:
1078 1086 return chunk, 0
1079 1087 elif ppos > pos:
1080 1088 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1081 1089 raise ValueError('Unknown chunk')
1082 1090
1083 1091 def _readheader(self):
1084 1092 """read the header and setup the object"""
1085 1093 typesize = self._unpackheader(_fparttypesize)[0]
1086 1094 self.type = self._fromheader(typesize)
1087 1095 indebug(self.ui, 'part type: "%s"' % self.type)
1088 1096 self.id = self._unpackheader(_fpartid)[0]
1089 1097 indebug(self.ui, 'part id: "%s"' % self.id)
1090 1098 # extract mandatory bit from type
1091 1099 self.mandatory = (self.type != self.type.lower())
1092 1100 self.type = self.type.lower()
1093 1101 ## reading parameters
1094 1102 # param count
1095 1103 mancount, advcount = self._unpackheader(_fpartparamcount)
1096 1104 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1097 1105 # param size
1098 1106 fparamsizes = _makefpartparamsizes(mancount + advcount)
1099 1107 paramsizes = self._unpackheader(fparamsizes)
1100 1108 # make it a list of couple again
1101 1109 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1102 1110 # split mandatory from advisory
1103 1111 mansizes = paramsizes[:mancount]
1104 1112 advsizes = paramsizes[mancount:]
1105 1113 # retrieve param value
1106 1114 manparams = []
1107 1115 for key, value in mansizes:
1108 1116 manparams.append((self._fromheader(key), self._fromheader(value)))
1109 1117 advparams = []
1110 1118 for key, value in advsizes:
1111 1119 advparams.append((self._fromheader(key), self._fromheader(value)))
1112 1120 self._initparams(manparams, advparams)
1113 1121 ## part payload
1114 1122 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1115 1123 # we read the data, tell it
1116 1124 self._initialized = True
1117 1125
1118 1126 def read(self, size=None):
1119 1127 """read payload data"""
1120 1128 if not self._initialized:
1121 1129 self._readheader()
1122 1130 if size is None:
1123 1131 data = self._payloadstream.read()
1124 1132 else:
1125 1133 data = self._payloadstream.read(size)
1126 1134 self._pos += len(data)
1127 1135 if size is None or len(data) < size:
1128 1136 if not self.consumed and self._pos:
1129 1137 self.ui.debug('bundle2-input-part: total payload size %i\n'
1130 1138 % self._pos)
1131 1139 self.consumed = True
1132 1140 return data
1133 1141
1134 1142 def tell(self):
1135 1143 return self._pos
1136 1144
1137 1145 def seek(self, offset, whence=0):
1138 1146 if whence == 0:
1139 1147 newpos = offset
1140 1148 elif whence == 1:
1141 1149 newpos = self._pos + offset
1142 1150 elif whence == 2:
1143 1151 if not self.consumed:
1144 1152 self.read()
1145 1153 newpos = self._chunkindex[-1][0] - offset
1146 1154 else:
1147 1155 raise ValueError('Unknown whence value: %r' % (whence,))
1148 1156
1149 1157 if newpos > self._chunkindex[-1][0] and not self.consumed:
1150 1158 self.read()
1151 1159 if not 0 <= newpos <= self._chunkindex[-1][0]:
1152 1160 raise ValueError('Offset out of range')
1153 1161
1154 1162 if self._pos != newpos:
1155 1163 chunk, internaloffset = self._findchunk(newpos)
1156 1164 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1157 1165 adjust = self.read(internaloffset)
1158 1166 if len(adjust) != internaloffset:
1159 1167 raise util.Abort(_('Seek failed\n'))
1160 1168 self._pos = newpos
1161 1169
1162 1170 # These are only the static capabilities.
1163 1171 # Check the 'getrepocaps' function for the rest.
1164 1172 capabilities = {'HG20': (),
1165 1173 'error': ('abort', 'unsupportedcontent', 'pushraced',
1166 1174 'pushkey'),
1167 1175 'listkeys': (),
1168 1176 'pushkey': (),
1169 1177 'digests': tuple(sorted(util.DIGESTS.keys())),
1170 1178 'remote-changegroup': ('http', 'https'),
1171 1179 'hgtagsfnodes': (),
1172 1180 }
1173 1181
1174 1182 def getrepocaps(repo, allowpushback=False):
1175 1183 """return the bundle2 capabilities for a given repo
1176 1184
1177 1185 Exists to allow extensions (like evolution) to mutate the capabilities.
1178 1186 """
1179 1187 caps = capabilities.copy()
1180 1188 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1181 1189 if obsolete.isenabled(repo, obsolete.exchangeopt):
1182 1190 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1183 1191 caps['obsmarkers'] = supportedformat
1184 1192 if allowpushback:
1185 1193 caps['pushback'] = ()
1186 1194 return caps
1187 1195
1188 1196 def bundle2caps(remote):
1189 1197 """return the bundle capabilities of a peer as dict"""
1190 1198 raw = remote.capable('bundle2')
1191 1199 if not raw and raw != '':
1192 1200 return {}
1193 1201 capsblob = urllib.unquote(remote.capable('bundle2'))
1194 1202 return decodecaps(capsblob)
1195 1203
1196 1204 def obsmarkersversion(caps):
1197 1205 """extract the list of supported obsmarkers versions from a bundle2caps dict
1198 1206 """
1199 1207 obscaps = caps.get('obsmarkers', ())
1200 1208 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1201 1209
1202 1210 @parthandler('changegroup', ('version', 'nbchanges'))
1203 1211 def handlechangegroup(op, inpart):
1204 1212 """apply a changegroup part on the repo
1205 1213
1206 1214 This is a very early implementation that will massive rework before being
1207 1215 inflicted to any end-user.
1208 1216 """
1209 1217 # Make sure we trigger a transaction creation
1210 1218 #
1211 1219 # The addchangegroup function will get a transaction object by itself, but
1212 1220 # we need to make sure we trigger the creation of a transaction object used
1213 1221 # for the whole processing scope.
1214 1222 op.gettransaction()
1215 1223 unpackerversion = inpart.params.get('version', '01')
1216 1224 # We should raise an appropriate exception here
1217 1225 unpacker = changegroup.packermap[unpackerversion][1]
1218 1226 cg = unpacker(inpart, None)
1219 1227 # the source and url passed here are overwritten by the one contained in
1220 1228 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1221 1229 nbchangesets = None
1222 1230 if 'nbchanges' in inpart.params:
1223 1231 nbchangesets = int(inpart.params.get('nbchanges'))
1224 1232 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2',
1225 1233 expectedtotal=nbchangesets)
1226 1234 op.records.add('changegroup', {'return': ret})
1227 1235 if op.reply is not None:
1228 1236 # This is definitely not the final form of this
1229 1237 # return. But one need to start somewhere.
1230 1238 part = op.reply.newpart('reply:changegroup', mandatory=False)
1231 1239 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1232 1240 part.addparam('return', '%i' % ret, mandatory=False)
1233 1241 assert not inpart.read()
1234 1242
1235 1243 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1236 1244 ['digest:%s' % k for k in util.DIGESTS.keys()])
1237 1245 @parthandler('remote-changegroup', _remotechangegroupparams)
1238 1246 def handleremotechangegroup(op, inpart):
1239 1247 """apply a bundle10 on the repo, given an url and validation information
1240 1248
1241 1249 All the information about the remote bundle to import are given as
1242 1250 parameters. The parameters include:
1243 1251 - url: the url to the bundle10.
1244 1252 - size: the bundle10 file size. It is used to validate what was
1245 1253 retrieved by the client matches the server knowledge about the bundle.
1246 1254 - digests: a space separated list of the digest types provided as
1247 1255 parameters.
1248 1256 - digest:<digest-type>: the hexadecimal representation of the digest with
1249 1257 that name. Like the size, it is used to validate what was retrieved by
1250 1258 the client matches what the server knows about the bundle.
1251 1259
1252 1260 When multiple digest types are given, all of them are checked.
1253 1261 """
1254 1262 try:
1255 1263 raw_url = inpart.params['url']
1256 1264 except KeyError:
1257 1265 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1258 1266 parsed_url = util.url(raw_url)
1259 1267 if parsed_url.scheme not in capabilities['remote-changegroup']:
1260 1268 raise util.Abort(_('remote-changegroup does not support %s urls') %
1261 1269 parsed_url.scheme)
1262 1270
1263 1271 try:
1264 1272 size = int(inpart.params['size'])
1265 1273 except ValueError:
1266 1274 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1267 1275 % 'size')
1268 1276 except KeyError:
1269 1277 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1270 1278
1271 1279 digests = {}
1272 1280 for typ in inpart.params.get('digests', '').split():
1273 1281 param = 'digest:%s' % typ
1274 1282 try:
1275 1283 value = inpart.params[param]
1276 1284 except KeyError:
1277 1285 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1278 1286 param)
1279 1287 digests[typ] = value
1280 1288
1281 1289 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1282 1290
1283 1291 # Make sure we trigger a transaction creation
1284 1292 #
1285 1293 # The addchangegroup function will get a transaction object by itself, but
1286 1294 # we need to make sure we trigger the creation of a transaction object used
1287 1295 # for the whole processing scope.
1288 1296 op.gettransaction()
1289 1297 from . import exchange
1290 1298 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1291 1299 if not isinstance(cg, changegroup.cg1unpacker):
1292 1300 raise util.Abort(_('%s: not a bundle version 1.0') %
1293 1301 util.hidepassword(raw_url))
1294 1302 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1295 1303 op.records.add('changegroup', {'return': ret})
1296 1304 if op.reply is not None:
1297 1305 # This is definitely not the final form of this
1298 1306 # return. But one need to start somewhere.
1299 1307 part = op.reply.newpart('reply:changegroup')
1300 1308 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1301 1309 part.addparam('return', '%i' % ret, mandatory=False)
1302 1310 try:
1303 1311 real_part.validate()
1304 1312 except util.Abort as e:
1305 1313 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1306 1314 (util.hidepassword(raw_url), str(e)))
1307 1315 assert not inpart.read()
1308 1316
1309 1317 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1310 1318 def handlereplychangegroup(op, inpart):
1311 1319 ret = int(inpart.params['return'])
1312 1320 replyto = int(inpart.params['in-reply-to'])
1313 1321 op.records.add('changegroup', {'return': ret}, replyto)
1314 1322
1315 1323 @parthandler('check:heads')
1316 1324 def handlecheckheads(op, inpart):
1317 1325 """check that head of the repo did not change
1318 1326
1319 1327 This is used to detect a push race when using unbundle.
1320 1328 This replaces the "heads" argument of unbundle."""
1321 1329 h = inpart.read(20)
1322 1330 heads = []
1323 1331 while len(h) == 20:
1324 1332 heads.append(h)
1325 1333 h = inpart.read(20)
1326 1334 assert not h
1327 1335 if heads != op.repo.heads():
1328 1336 raise error.PushRaced('repository changed while pushing - '
1329 1337 'please try again')
1330 1338
1331 1339 @parthandler('output')
1332 1340 def handleoutput(op, inpart):
1333 1341 """forward output captured on the server to the client"""
1334 1342 for line in inpart.read().splitlines():
1335 1343 op.ui.status(('remote: %s\n' % line))
1336 1344
1337 1345 @parthandler('replycaps')
1338 1346 def handlereplycaps(op, inpart):
1339 1347 """Notify that a reply bundle should be created
1340 1348
1341 1349 The payload contains the capabilities information for the reply"""
1342 1350 caps = decodecaps(inpart.read())
1343 1351 if op.reply is None:
1344 1352 op.reply = bundle20(op.ui, caps)
1345 1353
1346 1354 @parthandler('error:abort', ('message', 'hint'))
1347 1355 def handleerrorabort(op, inpart):
1348 1356 """Used to transmit abort error over the wire"""
1349 1357 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1350 1358
1351 1359 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1352 1360 'in-reply-to'))
1353 1361 def handleerrorpushkey(op, inpart):
1354 1362 """Used to transmit failure of a mandatory pushkey over the wire"""
1355 1363 kwargs = {}
1356 1364 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1357 1365 value = inpart.params.get(name)
1358 1366 if value is not None:
1359 1367 kwargs[name] = value
1360 1368 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1361 1369
1362 1370 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1363 1371 def handleerrorunsupportedcontent(op, inpart):
1364 1372 """Used to transmit unknown content error over the wire"""
1365 1373 kwargs = {}
1366 1374 parttype = inpart.params.get('parttype')
1367 1375 if parttype is not None:
1368 1376 kwargs['parttype'] = parttype
1369 1377 params = inpart.params.get('params')
1370 1378 if params is not None:
1371 1379 kwargs['params'] = params.split('\0')
1372 1380
1373 1381 raise error.BundleUnknownFeatureError(**kwargs)
1374 1382
1375 1383 @parthandler('error:pushraced', ('message',))
1376 1384 def handleerrorpushraced(op, inpart):
1377 1385 """Used to transmit push race error over the wire"""
1378 1386 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1379 1387
1380 1388 @parthandler('listkeys', ('namespace',))
1381 1389 def handlelistkeys(op, inpart):
1382 1390 """retrieve pushkey namespace content stored in a bundle2"""
1383 1391 namespace = inpart.params['namespace']
1384 1392 r = pushkey.decodekeys(inpart.read())
1385 1393 op.records.add('listkeys', (namespace, r))
1386 1394
1387 1395 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1388 1396 def handlepushkey(op, inpart):
1389 1397 """process a pushkey request"""
1390 1398 dec = pushkey.decode
1391 1399 namespace = dec(inpart.params['namespace'])
1392 1400 key = dec(inpart.params['key'])
1393 1401 old = dec(inpart.params['old'])
1394 1402 new = dec(inpart.params['new'])
1395 1403 ret = op.repo.pushkey(namespace, key, old, new)
1396 1404 record = {'namespace': namespace,
1397 1405 'key': key,
1398 1406 'old': old,
1399 1407 'new': new}
1400 1408 op.records.add('pushkey', record)
1401 1409 if op.reply is not None:
1402 1410 rpart = op.reply.newpart('reply:pushkey')
1403 1411 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1404 1412 rpart.addparam('return', '%i' % ret, mandatory=False)
1405 1413 if inpart.mandatory and not ret:
1406 1414 kwargs = {}
1407 1415 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1408 1416 if key in inpart.params:
1409 1417 kwargs[key] = inpart.params[key]
1410 1418 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1411 1419
1412 1420 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1413 1421 def handlepushkeyreply(op, inpart):
1414 1422 """retrieve the result of a pushkey request"""
1415 1423 ret = int(inpart.params['return'])
1416 1424 partid = int(inpart.params['in-reply-to'])
1417 1425 op.records.add('pushkey', {'return': ret}, partid)
1418 1426
1419 1427 @parthandler('obsmarkers')
1420 1428 def handleobsmarker(op, inpart):
1421 1429 """add a stream of obsmarkers to the repo"""
1422 1430 tr = op.gettransaction()
1423 1431 markerdata = inpart.read()
1424 1432 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1425 1433 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1426 1434 % len(markerdata))
1427 1435 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1428 1436 if new:
1429 1437 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1430 1438 op.records.add('obsmarkers', {'new': new})
1431 1439 if op.reply is not None:
1432 1440 rpart = op.reply.newpart('reply:obsmarkers')
1433 1441 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1434 1442 rpart.addparam('new', '%i' % new, mandatory=False)
1435 1443
1436 1444
1437 1445 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1438 1446 def handleobsmarkerreply(op, inpart):
1439 1447 """retrieve the result of a pushkey request"""
1440 1448 ret = int(inpart.params['new'])
1441 1449 partid = int(inpart.params['in-reply-to'])
1442 1450 op.records.add('obsmarkers', {'new': ret}, partid)
1443 1451
1444 1452 @parthandler('hgtagsfnodes')
1445 1453 def handlehgtagsfnodes(op, inpart):
1446 1454 """Applies .hgtags fnodes cache entries to the local repo.
1447 1455
1448 1456 Payload is pairs of 20 byte changeset nodes and filenodes.
1449 1457 """
1450 1458 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1451 1459
1452 1460 count = 0
1453 1461 while True:
1454 1462 node = inpart.read(20)
1455 1463 fnode = inpart.read(20)
1456 1464 if len(node) < 20 or len(fnode) < 20:
1457 1465 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1458 1466 break
1459 1467 cache.setfnode(node, fnode)
1460 1468 count += 1
1461 1469
1462 1470 cache.write()
1463 1471 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now