##// END OF EJS Templates
unbundle20: allow registering handlers for stream level parameters...
Pierre-Yves David -
r26395:4e7b0bf9 default
parent child Browse files
Show More
@@ -1,1422 +1,1434 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155 import urllib
156 156
157 157 from .i18n import _
158 158 from . import (
159 159 changegroup,
160 160 error,
161 161 obsolete,
162 162 pushkey,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 _pack = struct.pack
169 169 _unpack = struct.unpack
170 170
171 171 _fstreamparamsize = '>i'
172 172 _fpartheadersize = '>i'
173 173 _fparttypesize = '>B'
174 174 _fpartid = '>I'
175 175 _fpayloadsize = '>i'
176 176 _fpartparamcount = '>BB'
177 177
178 178 preferedchunksize = 4096
179 179
180 180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181 181
182 182 def outdebug(ui, message):
183 183 """debug regarding output stream (bundling)"""
184 184 if ui.configbool('devel', 'bundle2.debug', False):
185 185 ui.debug('bundle2-output: %s\n' % message)
186 186
187 187 def indebug(ui, message):
188 188 """debug on input stream (unbundling)"""
189 189 if ui.configbool('devel', 'bundle2.debug', False):
190 190 ui.debug('bundle2-input: %s\n' % message)
191 191
192 192 def validateparttype(parttype):
193 193 """raise ValueError if a parttype contains invalid character"""
194 194 if _parttypeforbidden.search(parttype):
195 195 raise ValueError(parttype)
196 196
197 197 def _makefpartparamsizes(nbparams):
198 198 """return a struct format to read part parameter sizes
199 199
200 200 The number parameters is variable so we need to build that format
201 201 dynamically.
202 202 """
203 203 return '>'+('BB'*nbparams)
204 204
205 205 parthandlermapping = {}
206 206
207 207 def parthandler(parttype, params=()):
208 208 """decorator that register a function as a bundle2 part handler
209 209
210 210 eg::
211 211
212 212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 213 def myparttypehandler(...):
214 214 '''process a part of type "my part".'''
215 215 ...
216 216 """
217 217 validateparttype(parttype)
218 218 def _decorator(func):
219 219 lparttype = parttype.lower() # enforce lower case matching.
220 220 assert lparttype not in parthandlermapping
221 221 parthandlermapping[lparttype] = func
222 222 func.params = frozenset(params)
223 223 return func
224 224 return _decorator
225 225
226 226 class unbundlerecords(object):
227 227 """keep record of what happens during and unbundle
228 228
229 229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 230 category of record and obj is an arbitrary object.
231 231
232 232 `records['cat']` will return all entries of this category 'cat'.
233 233
234 234 Iterating on the object itself will yield `('category', obj)` tuples
235 235 for all entries.
236 236
237 237 All iterations happens in chronological order.
238 238 """
239 239
240 240 def __init__(self):
241 241 self._categories = {}
242 242 self._sequences = []
243 243 self._replies = {}
244 244
245 245 def add(self, category, entry, inreplyto=None):
246 246 """add a new record of a given category.
247 247
248 248 The entry can then be retrieved in the list returned by
249 249 self['category']."""
250 250 self._categories.setdefault(category, []).append(entry)
251 251 self._sequences.append((category, entry))
252 252 if inreplyto is not None:
253 253 self.getreplies(inreplyto).add(category, entry)
254 254
255 255 def getreplies(self, partid):
256 256 """get the records that are replies to a specific part"""
257 257 return self._replies.setdefault(partid, unbundlerecords())
258 258
259 259 def __getitem__(self, cat):
260 260 return tuple(self._categories.get(cat, ()))
261 261
262 262 def __iter__(self):
263 263 return iter(self._sequences)
264 264
265 265 def __len__(self):
266 266 return len(self._sequences)
267 267
268 268 def __nonzero__(self):
269 269 return bool(self._sequences)
270 270
271 271 class bundleoperation(object):
272 272 """an object that represents a single bundling process
273 273
274 274 Its purpose is to carry unbundle-related objects and states.
275 275
276 276 A new object should be created at the beginning of each bundle processing.
277 277 The object is to be returned by the processing function.
278 278
279 279 The object has very little content now it will ultimately contain:
280 280 * an access to the repo the bundle is applied to,
281 281 * a ui object,
282 282 * a way to retrieve a transaction to add changes to the repo,
283 283 * a way to record the result of processing each part,
284 284 * a way to construct a bundle response when applicable.
285 285 """
286 286
287 287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 288 self.repo = repo
289 289 self.ui = repo.ui
290 290 self.records = unbundlerecords()
291 291 self.gettransaction = transactiongetter
292 292 self.reply = None
293 293 self.captureoutput = captureoutput
294 294
295 295 class TransactionUnavailable(RuntimeError):
296 296 pass
297 297
298 298 def _notransaction():
299 299 """default method to get a transaction while processing a bundle
300 300
301 301 Raise an exception to highlight the fact that no transaction was expected
302 302 to be created"""
303 303 raise TransactionUnavailable()
304 304
305 305 def processbundle(repo, unbundler, transactiongetter=None, op=None):
306 306 """This function process a bundle, apply effect to/from a repo
307 307
308 308 It iterates over each part then searches for and uses the proper handling
309 309 code to process the part. Parts are processed in order.
310 310
311 311 This is very early version of this function that will be strongly reworked
312 312 before final usage.
313 313
314 314 Unknown Mandatory part will abort the process.
315 315
316 316 It is temporarily possible to provide a prebuilt bundleoperation to the
317 317 function. This is used to ensure output is properly propagated in case of
318 318 an error during the unbundling. This output capturing part will likely be
319 319 reworked and this ability will probably go away in the process.
320 320 """
321 321 if op is None:
322 322 if transactiongetter is None:
323 323 transactiongetter = _notransaction
324 324 op = bundleoperation(repo, transactiongetter)
325 325 # todo:
326 326 # - replace this is a init function soon.
327 327 # - exception catching
328 328 unbundler.params
329 329 if repo.ui.debugflag:
330 330 msg = ['bundle2-input-bundle:']
331 331 if unbundler.params:
332 332 msg.append(' %i params')
333 333 if op.gettransaction is None:
334 334 msg.append(' no-transaction')
335 335 else:
336 336 msg.append(' with-transaction')
337 337 msg.append('\n')
338 338 repo.ui.debug(''.join(msg))
339 339 iterparts = enumerate(unbundler.iterparts())
340 340 part = None
341 341 nbpart = 0
342 342 try:
343 343 for nbpart, part in iterparts:
344 344 _processpart(op, part)
345 345 except BaseException as exc:
346 346 for nbpart, part in iterparts:
347 347 # consume the bundle content
348 348 part.seek(0, 2)
349 349 # Small hack to let caller code distinguish exceptions from bundle2
350 350 # processing from processing the old format. This is mostly
351 351 # needed to handle different return codes to unbundle according to the
352 352 # type of bundle. We should probably clean up or drop this return code
353 353 # craziness in a future version.
354 354 exc.duringunbundle2 = True
355 355 salvaged = []
356 356 replycaps = None
357 357 if op.reply is not None:
358 358 salvaged = op.reply.salvageoutput()
359 359 replycaps = op.reply.capabilities
360 360 exc._replycaps = replycaps
361 361 exc._bundle2salvagedoutput = salvaged
362 362 raise
363 363 finally:
364 364 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
365 365
366 366 return op
367 367
368 368 def _processpart(op, part):
369 369 """process a single part from a bundle
370 370
371 371 The part is guaranteed to have been fully consumed when the function exits
372 372 (even if an exception is raised)."""
373 373 status = 'unknown' # used by debug output
374 374 try:
375 375 try:
376 376 handler = parthandlermapping.get(part.type)
377 377 if handler is None:
378 378 status = 'unsupported-type'
379 379 raise error.BundleUnknownFeatureError(parttype=part.type)
380 380 indebug(op.ui, 'found a handler for part %r' % part.type)
381 381 unknownparams = part.mandatorykeys - handler.params
382 382 if unknownparams:
383 383 unknownparams = list(unknownparams)
384 384 unknownparams.sort()
385 385 status = 'unsupported-params (%s)' % unknownparams
386 386 raise error.BundleUnknownFeatureError(parttype=part.type,
387 387 params=unknownparams)
388 388 status = 'supported'
389 389 except error.BundleUnknownFeatureError as exc:
390 390 if part.mandatory: # mandatory parts
391 391 raise
392 392 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
393 393 return # skip to part processing
394 394 finally:
395 395 if op.ui.debugflag:
396 396 msg = ['bundle2-input-part: "%s"' % part.type]
397 397 if not part.mandatory:
398 398 msg.append(' (advisory)')
399 399 nbmp = len(part.mandatorykeys)
400 400 nbap = len(part.params) - nbmp
401 401 if nbmp or nbap:
402 402 msg.append(' (params:')
403 403 if nbmp:
404 404 msg.append(' %i mandatory' % nbmp)
405 405 if nbap:
406 406 msg.append(' %i advisory' % nbmp)
407 407 msg.append(')')
408 408 msg.append(' %s\n' % status)
409 409 op.ui.debug(''.join(msg))
410 410
411 411 # handler is called outside the above try block so that we don't
412 412 # risk catching KeyErrors from anything other than the
413 413 # parthandlermapping lookup (any KeyError raised by handler()
414 414 # itself represents a defect of a different variety).
415 415 output = None
416 416 if op.captureoutput and op.reply is not None:
417 417 op.ui.pushbuffer(error=True, subproc=True)
418 418 output = ''
419 419 try:
420 420 handler(op, part)
421 421 finally:
422 422 if output is not None:
423 423 output = op.ui.popbuffer()
424 424 if output:
425 425 outpart = op.reply.newpart('output', data=output,
426 426 mandatory=False)
427 427 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
428 428 finally:
429 429 # consume the part content to not corrupt the stream.
430 430 part.seek(0, 2)
431 431
432 432
433 433 def decodecaps(blob):
434 434 """decode a bundle2 caps bytes blob into a dictionary
435 435
436 436 The blob is a list of capabilities (one per line)
437 437 Capabilities may have values using a line of the form::
438 438
439 439 capability=value1,value2,value3
440 440
441 441 The values are always a list."""
442 442 caps = {}
443 443 for line in blob.splitlines():
444 444 if not line:
445 445 continue
446 446 if '=' not in line:
447 447 key, vals = line, ()
448 448 else:
449 449 key, vals = line.split('=', 1)
450 450 vals = vals.split(',')
451 451 key = urllib.unquote(key)
452 452 vals = [urllib.unquote(v) for v in vals]
453 453 caps[key] = vals
454 454 return caps
455 455
456 456 def encodecaps(caps):
457 457 """encode a bundle2 caps dictionary into a bytes blob"""
458 458 chunks = []
459 459 for ca in sorted(caps):
460 460 vals = caps[ca]
461 461 ca = urllib.quote(ca)
462 462 vals = [urllib.quote(v) for v in vals]
463 463 if vals:
464 464 ca = "%s=%s" % (ca, ','.join(vals))
465 465 chunks.append(ca)
466 466 return '\n'.join(chunks)
467 467
468 468 class bundle20(object):
469 469 """represent an outgoing bundle2 container
470 470
471 471 Use the `addparam` method to add stream level parameter. and `newpart` to
472 472 populate it. Then call `getchunks` to retrieve all the binary chunks of
473 473 data that compose the bundle2 container."""
474 474
475 475 _magicstring = 'HG20'
476 476
477 477 def __init__(self, ui, capabilities=()):
478 478 self.ui = ui
479 479 self._params = []
480 480 self._parts = []
481 481 self.capabilities = dict(capabilities)
482 482
483 483 @property
484 484 def nbparts(self):
485 485 """total number of parts added to the bundler"""
486 486 return len(self._parts)
487 487
488 488 # methods used to defines the bundle2 content
489 489 def addparam(self, name, value=None):
490 490 """add a stream level parameter"""
491 491 if not name:
492 492 raise ValueError('empty parameter name')
493 493 if name[0] not in string.letters:
494 494 raise ValueError('non letter first character: %r' % name)
495 495 self._params.append((name, value))
496 496
497 497 def addpart(self, part):
498 498 """add a new part to the bundle2 container
499 499
500 500 Parts contains the actual applicative payload."""
501 501 assert part.id is None
502 502 part.id = len(self._parts) # very cheap counter
503 503 self._parts.append(part)
504 504
505 505 def newpart(self, typeid, *args, **kwargs):
506 506 """create a new part and add it to the containers
507 507
508 508 As the part is directly added to the containers. For now, this means
509 509 that any failure to properly initialize the part after calling
510 510 ``newpart`` should result in a failure of the whole bundling process.
511 511
512 512 You can still fall back to manually create and add if you need better
513 513 control."""
514 514 part = bundlepart(typeid, *args, **kwargs)
515 515 self.addpart(part)
516 516 return part
517 517
518 518 # methods used to generate the bundle2 stream
519 519 def getchunks(self):
520 520 if self.ui.debugflag:
521 521 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
522 522 if self._params:
523 523 msg.append(' (%i params)' % len(self._params))
524 524 msg.append(' %i parts total\n' % len(self._parts))
525 525 self.ui.debug(''.join(msg))
526 526 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
527 527 yield self._magicstring
528 528 param = self._paramchunk()
529 529 outdebug(self.ui, 'bundle parameter: %s' % param)
530 530 yield _pack(_fstreamparamsize, len(param))
531 531 if param:
532 532 yield param
533 533
534 534 outdebug(self.ui, 'start of parts')
535 535 for part in self._parts:
536 536 outdebug(self.ui, 'bundle part: "%s"' % part.type)
537 537 for chunk in part.getchunks(ui=self.ui):
538 538 yield chunk
539 539 outdebug(self.ui, 'end of bundle')
540 540 yield _pack(_fpartheadersize, 0)
541 541
542 542 def _paramchunk(self):
543 543 """return a encoded version of all stream parameters"""
544 544 blocks = []
545 545 for par, value in self._params:
546 546 par = urllib.quote(par)
547 547 if value is not None:
548 548 value = urllib.quote(value)
549 549 par = '%s=%s' % (par, value)
550 550 blocks.append(par)
551 551 return ' '.join(blocks)
552 552
553 553 def salvageoutput(self):
554 554 """return a list with a copy of all output parts in the bundle
555 555
556 556 This is meant to be used during error handling to make sure we preserve
557 557 server output"""
558 558 salvaged = []
559 559 for part in self._parts:
560 560 if part.type.startswith('output'):
561 561 salvaged.append(part.copy())
562 562 return salvaged
563 563
564 564
565 565 class unpackermixin(object):
566 566 """A mixin to extract bytes and struct data from a stream"""
567 567
568 568 def __init__(self, fp):
569 569 self._fp = fp
570 570 self._seekable = (util.safehasattr(fp, 'seek') and
571 571 util.safehasattr(fp, 'tell'))
572 572
573 573 def _unpack(self, format):
574 574 """unpack this struct format from the stream"""
575 575 data = self._readexact(struct.calcsize(format))
576 576 return _unpack(format, data)
577 577
578 578 def _readexact(self, size):
579 579 """read exactly <size> bytes from the stream"""
580 580 return changegroup.readexactly(self._fp, size)
581 581
582 582 def seek(self, offset, whence=0):
583 583 """move the underlying file pointer"""
584 584 if self._seekable:
585 585 return self._fp.seek(offset, whence)
586 586 else:
587 587 raise NotImplementedError(_('File pointer is not seekable'))
588 588
589 589 def tell(self):
590 590 """return the file offset, or None if file is not seekable"""
591 591 if self._seekable:
592 592 try:
593 593 return self._fp.tell()
594 594 except IOError as e:
595 595 if e.errno == errno.ESPIPE:
596 596 self._seekable = False
597 597 else:
598 598 raise
599 599 return None
600 600
601 601 def close(self):
602 602 """close underlying file"""
603 603 if util.safehasattr(self._fp, 'close'):
604 604 return self._fp.close()
605 605
606 606 def getunbundler(ui, fp, magicstring=None):
607 607 """return a valid unbundler object for a given magicstring"""
608 608 if magicstring is None:
609 609 magicstring = changegroup.readexactly(fp, 4)
610 610 magic, version = magicstring[0:2], magicstring[2:4]
611 611 if magic != 'HG':
612 612 raise util.Abort(_('not a Mercurial bundle'))
613 613 unbundlerclass = formatmap.get(version)
614 614 if unbundlerclass is None:
615 615 raise util.Abort(_('unknown bundle version %s') % version)
616 616 unbundler = unbundlerclass(ui, fp)
617 617 indebug(ui, 'start processing of %s stream' % magicstring)
618 618 return unbundler
619 619
620 620 class unbundle20(unpackermixin):
621 621 """interpret a bundle2 stream
622 622
623 623 This class is fed with a binary stream and yields parts through its
624 624 `iterparts` methods."""
625 625
626 626 def __init__(self, ui, fp):
627 627 """If header is specified, we do not read it out of the stream."""
628 628 self.ui = ui
629 629 super(unbundle20, self).__init__(fp)
630 630
631 631 @util.propertycache
632 632 def params(self):
633 633 """dictionary of stream level parameters"""
634 634 indebug(self.ui, 'reading bundle2 stream parameters')
635 635 params = {}
636 636 paramssize = self._unpack(_fstreamparamsize)[0]
637 637 if paramssize < 0:
638 638 raise error.BundleValueError('negative bundle param size: %i'
639 639 % paramssize)
640 640 if paramssize:
641 641 for p in self._readexact(paramssize).split(' '):
642 642 p = p.split('=', 1)
643 643 p = [urllib.unquote(i) for i in p]
644 644 if len(p) < 2:
645 645 p.append(None)
646 646 self._processparam(*p)
647 647 params[p[0]] = p[1]
648 648 return params
649 649
650 650 def _processparam(self, name, value):
651 651 """process a parameter, applying its effect if needed
652 652
653 653 Parameter starting with a lower case letter are advisory and will be
654 654 ignored when unknown. Those starting with an upper case letter are
655 655 mandatory and will this function will raise a KeyError when unknown.
656 656
657 657 Note: no option are currently supported. Any input will be either
658 658 ignored or failing.
659 659 """
660 660 if not name:
661 661 raise ValueError('empty parameter name')
662 662 if name[0] not in string.letters:
663 663 raise ValueError('non letter first character: %r' % name)
664 # Some logic will be later added here to try to process the option for
665 # a dict of known parameter.
666 if name[0].islower():
667 indebug(self.ui, "ignoring unknown parameter %r" % name)
664 try:
665 handler = b2streamparamsmap[name.lower()]
666 except KeyError:
667 if name[0].islower():
668 indebug(self.ui, "ignoring unknown parameter %r" % name)
669 else:
670 raise error.BundleUnknownFeatureError(params=(name,))
668 671 else:
669 raise error.BundleUnknownFeatureError(params=(name,))
670
672 handler(self, name, value)
671 673
672 674 def iterparts(self):
673 675 """yield all parts contained in the stream"""
674 676 # make sure param have been loaded
675 677 self.params
676 678 indebug(self.ui, 'start extraction of bundle2 parts')
677 679 headerblock = self._readpartheader()
678 680 while headerblock is not None:
679 681 part = unbundlepart(self.ui, headerblock, self._fp)
680 682 yield part
681 683 part.seek(0, 2)
682 684 headerblock = self._readpartheader()
683 685 indebug(self.ui, 'end of bundle2 stream')
684 686
685 687 def _readpartheader(self):
686 688 """reads a part header size and return the bytes blob
687 689
688 690 returns None if empty"""
689 691 headersize = self._unpack(_fpartheadersize)[0]
690 692 if headersize < 0:
691 693 raise error.BundleValueError('negative part header size: %i'
692 694 % headersize)
693 695 indebug(self.ui, 'part header size: %i' % headersize)
694 696 if headersize:
695 697 return self._readexact(headersize)
696 698 return None
697 699
698 700 def compressed(self):
699 701 return False
700 702
701 703 formatmap = {'20': unbundle20}
702 704
705 b2streamparamsmap = {}
706
707 def b2streamparamhandler(name):
708 """register a handler for a stream level parameter"""
709 def decorator(func):
710 assert name not in formatmap
711 b2streamparamsmap[name] = func
712 return func
713 return decorator
714
703 715 class bundlepart(object):
704 716 """A bundle2 part contains application level payload
705 717
706 718 The part `type` is used to route the part to the application level
707 719 handler.
708 720
709 721 The part payload is contained in ``part.data``. It could be raw bytes or a
710 722 generator of byte chunks.
711 723
712 724 You can add parameters to the part using the ``addparam`` method.
713 725 Parameters can be either mandatory (default) or advisory. Remote side
714 726 should be able to safely ignore the advisory ones.
715 727
716 728 Both data and parameters cannot be modified after the generation has begun.
717 729 """
718 730
719 731 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
720 732 data='', mandatory=True):
721 733 validateparttype(parttype)
722 734 self.id = None
723 735 self.type = parttype
724 736 self._data = data
725 737 self._mandatoryparams = list(mandatoryparams)
726 738 self._advisoryparams = list(advisoryparams)
727 739 # checking for duplicated entries
728 740 self._seenparams = set()
729 741 for pname, __ in self._mandatoryparams + self._advisoryparams:
730 742 if pname in self._seenparams:
731 743 raise RuntimeError('duplicated params: %s' % pname)
732 744 self._seenparams.add(pname)
733 745 # status of the part's generation:
734 746 # - None: not started,
735 747 # - False: currently generated,
736 748 # - True: generation done.
737 749 self._generated = None
738 750 self.mandatory = mandatory
739 751
740 752 def copy(self):
741 753 """return a copy of the part
742 754
743 755 The new part have the very same content but no partid assigned yet.
744 756 Parts with generated data cannot be copied."""
745 757 assert not util.safehasattr(self.data, 'next')
746 758 return self.__class__(self.type, self._mandatoryparams,
747 759 self._advisoryparams, self._data, self.mandatory)
748 760
749 761 # methods used to defines the part content
750 762 def __setdata(self, data):
751 763 if self._generated is not None:
752 764 raise error.ReadOnlyPartError('part is being generated')
753 765 self._data = data
754 766 def __getdata(self):
755 767 return self._data
756 768 data = property(__getdata, __setdata)
757 769
758 770 @property
759 771 def mandatoryparams(self):
760 772 # make it an immutable tuple to force people through ``addparam``
761 773 return tuple(self._mandatoryparams)
762 774
763 775 @property
764 776 def advisoryparams(self):
765 777 # make it an immutable tuple to force people through ``addparam``
766 778 return tuple(self._advisoryparams)
767 779
768 780 def addparam(self, name, value='', mandatory=True):
769 781 if self._generated is not None:
770 782 raise error.ReadOnlyPartError('part is being generated')
771 783 if name in self._seenparams:
772 784 raise ValueError('duplicated params: %s' % name)
773 785 self._seenparams.add(name)
774 786 params = self._advisoryparams
775 787 if mandatory:
776 788 params = self._mandatoryparams
777 789 params.append((name, value))
778 790
779 791 # methods used to generates the bundle2 stream
780 792 def getchunks(self, ui):
781 793 if self._generated is not None:
782 794 raise RuntimeError('part can only be consumed once')
783 795 self._generated = False
784 796
785 797 if ui.debugflag:
786 798 msg = ['bundle2-output-part: "%s"' % self.type]
787 799 if not self.mandatory:
788 800 msg.append(' (advisory)')
789 801 nbmp = len(self.mandatoryparams)
790 802 nbap = len(self.advisoryparams)
791 803 if nbmp or nbap:
792 804 msg.append(' (params:')
793 805 if nbmp:
794 806 msg.append(' %i mandatory' % nbmp)
795 807 if nbap:
796 808 msg.append(' %i advisory' % nbmp)
797 809 msg.append(')')
798 810 if not self.data:
799 811 msg.append(' empty payload')
800 812 elif util.safehasattr(self.data, 'next'):
801 813 msg.append(' streamed payload')
802 814 else:
803 815 msg.append(' %i bytes payload' % len(self.data))
804 816 msg.append('\n')
805 817 ui.debug(''.join(msg))
806 818
807 819 #### header
808 820 if self.mandatory:
809 821 parttype = self.type.upper()
810 822 else:
811 823 parttype = self.type.lower()
812 824 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
813 825 ## parttype
814 826 header = [_pack(_fparttypesize, len(parttype)),
815 827 parttype, _pack(_fpartid, self.id),
816 828 ]
817 829 ## parameters
818 830 # count
819 831 manpar = self.mandatoryparams
820 832 advpar = self.advisoryparams
821 833 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
822 834 # size
823 835 parsizes = []
824 836 for key, value in manpar:
825 837 parsizes.append(len(key))
826 838 parsizes.append(len(value))
827 839 for key, value in advpar:
828 840 parsizes.append(len(key))
829 841 parsizes.append(len(value))
830 842 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
831 843 header.append(paramsizes)
832 844 # key, value
833 845 for key, value in manpar:
834 846 header.append(key)
835 847 header.append(value)
836 848 for key, value in advpar:
837 849 header.append(key)
838 850 header.append(value)
839 851 ## finalize header
840 852 headerchunk = ''.join(header)
841 853 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
842 854 yield _pack(_fpartheadersize, len(headerchunk))
843 855 yield headerchunk
844 856 ## payload
845 857 try:
846 858 for chunk in self._payloadchunks():
847 859 outdebug(ui, 'payload chunk size: %i' % len(chunk))
848 860 yield _pack(_fpayloadsize, len(chunk))
849 861 yield chunk
850 862 except GeneratorExit:
851 863 # GeneratorExit means that nobody is listening for our
852 864 # results anyway, so just bail quickly rather than trying
853 865 # to produce an error part.
854 866 ui.debug('bundle2-generatorexit\n')
855 867 raise
856 868 except BaseException as exc:
857 869 # backup exception data for later
858 870 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
859 871 % exc)
860 872 exc_info = sys.exc_info()
861 873 msg = 'unexpected error: %s' % exc
862 874 interpart = bundlepart('error:abort', [('message', msg)],
863 875 mandatory=False)
864 876 interpart.id = 0
865 877 yield _pack(_fpayloadsize, -1)
866 878 for chunk in interpart.getchunks(ui=ui):
867 879 yield chunk
868 880 outdebug(ui, 'closing payload chunk')
869 881 # abort current part payload
870 882 yield _pack(_fpayloadsize, 0)
871 883 raise exc_info[0], exc_info[1], exc_info[2]
872 884 # end of payload
873 885 outdebug(ui, 'closing payload chunk')
874 886 yield _pack(_fpayloadsize, 0)
875 887 self._generated = True
876 888
877 889 def _payloadchunks(self):
878 890 """yield chunks of a the part payload
879 891
880 892 Exists to handle the different methods to provide data to a part."""
881 893 # we only support fixed size data now.
882 894 # This will be improved in the future.
883 895 if util.safehasattr(self.data, 'next'):
884 896 buff = util.chunkbuffer(self.data)
885 897 chunk = buff.read(preferedchunksize)
886 898 while chunk:
887 899 yield chunk
888 900 chunk = buff.read(preferedchunksize)
889 901 elif len(self.data):
890 902 yield self.data
891 903
892 904
893 905 flaginterrupt = -1
894 906
895 907 class interrupthandler(unpackermixin):
896 908 """read one part and process it with restricted capability
897 909
898 910 This allows to transmit exception raised on the producer size during part
899 911 iteration while the consumer is reading a part.
900 912
901 913 Part processed in this manner only have access to a ui object,"""
902 914
903 915 def __init__(self, ui, fp):
904 916 super(interrupthandler, self).__init__(fp)
905 917 self.ui = ui
906 918
907 919 def _readpartheader(self):
908 920 """reads a part header size and return the bytes blob
909 921
910 922 returns None if empty"""
911 923 headersize = self._unpack(_fpartheadersize)[0]
912 924 if headersize < 0:
913 925 raise error.BundleValueError('negative part header size: %i'
914 926 % headersize)
915 927 indebug(self.ui, 'part header size: %i\n' % headersize)
916 928 if headersize:
917 929 return self._readexact(headersize)
918 930 return None
919 931
920 932 def __call__(self):
921 933
922 934 self.ui.debug('bundle2-input-stream-interrupt:'
923 935 ' opening out of band context\n')
924 936 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
925 937 headerblock = self._readpartheader()
926 938 if headerblock is None:
927 939 indebug(self.ui, 'no part found during interruption.')
928 940 return
929 941 part = unbundlepart(self.ui, headerblock, self._fp)
930 942 op = interruptoperation(self.ui)
931 943 _processpart(op, part)
932 944 self.ui.debug('bundle2-input-stream-interrupt:'
933 945 ' closing out of band context\n')
934 946
935 947 class interruptoperation(object):
936 948 """A limited operation to be use by part handler during interruption
937 949
938 950 It only have access to an ui object.
939 951 """
940 952
941 953 def __init__(self, ui):
942 954 self.ui = ui
943 955 self.reply = None
944 956 self.captureoutput = False
945 957
946 958 @property
947 959 def repo(self):
948 960 raise RuntimeError('no repo access from stream interruption')
949 961
950 962 def gettransaction(self):
951 963 raise TransactionUnavailable('no repo access from stream interruption')
952 964
953 965 class unbundlepart(unpackermixin):
954 966 """a bundle part read from a bundle"""
955 967
956 968 def __init__(self, ui, header, fp):
957 969 super(unbundlepart, self).__init__(fp)
958 970 self.ui = ui
959 971 # unbundle state attr
960 972 self._headerdata = header
961 973 self._headeroffset = 0
962 974 self._initialized = False
963 975 self.consumed = False
964 976 # part data
965 977 self.id = None
966 978 self.type = None
967 979 self.mandatoryparams = None
968 980 self.advisoryparams = None
969 981 self.params = None
970 982 self.mandatorykeys = ()
971 983 self._payloadstream = None
972 984 self._readheader()
973 985 self._mandatory = None
974 986 self._chunkindex = [] #(payload, file) position tuples for chunk starts
975 987 self._pos = 0
976 988
977 989 def _fromheader(self, size):
978 990 """return the next <size> byte from the header"""
979 991 offset = self._headeroffset
980 992 data = self._headerdata[offset:(offset + size)]
981 993 self._headeroffset = offset + size
982 994 return data
983 995
984 996 def _unpackheader(self, format):
985 997 """read given format from header
986 998
987 999 This automatically compute the size of the format to read."""
988 1000 data = self._fromheader(struct.calcsize(format))
989 1001 return _unpack(format, data)
990 1002
991 1003 def _initparams(self, mandatoryparams, advisoryparams):
992 1004 """internal function to setup all logic related parameters"""
993 1005 # make it read only to prevent people touching it by mistake.
994 1006 self.mandatoryparams = tuple(mandatoryparams)
995 1007 self.advisoryparams = tuple(advisoryparams)
996 1008 # user friendly UI
997 1009 self.params = dict(self.mandatoryparams)
998 1010 self.params.update(dict(self.advisoryparams))
999 1011 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1000 1012
1001 1013 def _payloadchunks(self, chunknum=0):
1002 1014 '''seek to specified chunk and start yielding data'''
1003 1015 if len(self._chunkindex) == 0:
1004 1016 assert chunknum == 0, 'Must start with chunk 0'
1005 1017 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1006 1018 else:
1007 1019 assert chunknum < len(self._chunkindex), \
1008 1020 'Unknown chunk %d' % chunknum
1009 1021 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1010 1022
1011 1023 pos = self._chunkindex[chunknum][0]
1012 1024 payloadsize = self._unpack(_fpayloadsize)[0]
1013 1025 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1014 1026 while payloadsize:
1015 1027 if payloadsize == flaginterrupt:
1016 1028 # interruption detection, the handler will now read a
1017 1029 # single part and process it.
1018 1030 interrupthandler(self.ui, self._fp)()
1019 1031 elif payloadsize < 0:
1020 1032 msg = 'negative payload chunk size: %i' % payloadsize
1021 1033 raise error.BundleValueError(msg)
1022 1034 else:
1023 1035 result = self._readexact(payloadsize)
1024 1036 chunknum += 1
1025 1037 pos += payloadsize
1026 1038 if chunknum == len(self._chunkindex):
1027 1039 self._chunkindex.append((pos,
1028 1040 super(unbundlepart, self).tell()))
1029 1041 yield result
1030 1042 payloadsize = self._unpack(_fpayloadsize)[0]
1031 1043 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1032 1044
1033 1045 def _findchunk(self, pos):
1034 1046 '''for a given payload position, return a chunk number and offset'''
1035 1047 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1036 1048 if ppos == pos:
1037 1049 return chunk, 0
1038 1050 elif ppos > pos:
1039 1051 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1040 1052 raise ValueError('Unknown chunk')
1041 1053
1042 1054 def _readheader(self):
1043 1055 """read the header and setup the object"""
1044 1056 typesize = self._unpackheader(_fparttypesize)[0]
1045 1057 self.type = self._fromheader(typesize)
1046 1058 indebug(self.ui, 'part type: "%s"' % self.type)
1047 1059 self.id = self._unpackheader(_fpartid)[0]
1048 1060 indebug(self.ui, 'part id: "%s"' % self.id)
1049 1061 # extract mandatory bit from type
1050 1062 self.mandatory = (self.type != self.type.lower())
1051 1063 self.type = self.type.lower()
1052 1064 ## reading parameters
1053 1065 # param count
1054 1066 mancount, advcount = self._unpackheader(_fpartparamcount)
1055 1067 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1056 1068 # param size
1057 1069 fparamsizes = _makefpartparamsizes(mancount + advcount)
1058 1070 paramsizes = self._unpackheader(fparamsizes)
1059 1071 # make it a list of couple again
1060 1072 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1061 1073 # split mandatory from advisory
1062 1074 mansizes = paramsizes[:mancount]
1063 1075 advsizes = paramsizes[mancount:]
1064 1076 # retrieve param value
1065 1077 manparams = []
1066 1078 for key, value in mansizes:
1067 1079 manparams.append((self._fromheader(key), self._fromheader(value)))
1068 1080 advparams = []
1069 1081 for key, value in advsizes:
1070 1082 advparams.append((self._fromheader(key), self._fromheader(value)))
1071 1083 self._initparams(manparams, advparams)
1072 1084 ## part payload
1073 1085 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1074 1086 # we read the data, tell it
1075 1087 self._initialized = True
1076 1088
1077 1089 def read(self, size=None):
1078 1090 """read payload data"""
1079 1091 if not self._initialized:
1080 1092 self._readheader()
1081 1093 if size is None:
1082 1094 data = self._payloadstream.read()
1083 1095 else:
1084 1096 data = self._payloadstream.read(size)
1085 1097 self._pos += len(data)
1086 1098 if size is None or len(data) < size:
1087 1099 if not self.consumed and self._pos:
1088 1100 self.ui.debug('bundle2-input-part: total payload size %i\n'
1089 1101 % self._pos)
1090 1102 self.consumed = True
1091 1103 return data
1092 1104
1093 1105 def tell(self):
1094 1106 return self._pos
1095 1107
1096 1108 def seek(self, offset, whence=0):
1097 1109 if whence == 0:
1098 1110 newpos = offset
1099 1111 elif whence == 1:
1100 1112 newpos = self._pos + offset
1101 1113 elif whence == 2:
1102 1114 if not self.consumed:
1103 1115 self.read()
1104 1116 newpos = self._chunkindex[-1][0] - offset
1105 1117 else:
1106 1118 raise ValueError('Unknown whence value: %r' % (whence,))
1107 1119
1108 1120 if newpos > self._chunkindex[-1][0] and not self.consumed:
1109 1121 self.read()
1110 1122 if not 0 <= newpos <= self._chunkindex[-1][0]:
1111 1123 raise ValueError('Offset out of range')
1112 1124
1113 1125 if self._pos != newpos:
1114 1126 chunk, internaloffset = self._findchunk(newpos)
1115 1127 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1116 1128 adjust = self.read(internaloffset)
1117 1129 if len(adjust) != internaloffset:
1118 1130 raise util.Abort(_('Seek failed\n'))
1119 1131 self._pos = newpos
1120 1132
1121 1133 # These are only the static capabilities.
1122 1134 # Check the 'getrepocaps' function for the rest.
1123 1135 capabilities = {'HG20': (),
1124 1136 'error': ('abort', 'unsupportedcontent', 'pushraced',
1125 1137 'pushkey'),
1126 1138 'listkeys': (),
1127 1139 'pushkey': (),
1128 1140 'digests': tuple(sorted(util.DIGESTS.keys())),
1129 1141 'remote-changegroup': ('http', 'https'),
1130 1142 'hgtagsfnodes': (),
1131 1143 }
1132 1144
1133 1145 def getrepocaps(repo, allowpushback=False):
1134 1146 """return the bundle2 capabilities for a given repo
1135 1147
1136 1148 Exists to allow extensions (like evolution) to mutate the capabilities.
1137 1149 """
1138 1150 caps = capabilities.copy()
1139 1151 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1140 1152 if obsolete.isenabled(repo, obsolete.exchangeopt):
1141 1153 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1142 1154 caps['obsmarkers'] = supportedformat
1143 1155 if allowpushback:
1144 1156 caps['pushback'] = ()
1145 1157 return caps
1146 1158
1147 1159 def bundle2caps(remote):
1148 1160 """return the bundle capabilities of a peer as dict"""
1149 1161 raw = remote.capable('bundle2')
1150 1162 if not raw and raw != '':
1151 1163 return {}
1152 1164 capsblob = urllib.unquote(remote.capable('bundle2'))
1153 1165 return decodecaps(capsblob)
1154 1166
1155 1167 def obsmarkersversion(caps):
1156 1168 """extract the list of supported obsmarkers versions from a bundle2caps dict
1157 1169 """
1158 1170 obscaps = caps.get('obsmarkers', ())
1159 1171 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1160 1172
1161 1173 @parthandler('changegroup', ('version', 'nbchanges'))
1162 1174 def handlechangegroup(op, inpart):
1163 1175 """apply a changegroup part on the repo
1164 1176
1165 1177 This is a very early implementation that will massive rework before being
1166 1178 inflicted to any end-user.
1167 1179 """
1168 1180 # Make sure we trigger a transaction creation
1169 1181 #
1170 1182 # The addchangegroup function will get a transaction object by itself, but
1171 1183 # we need to make sure we trigger the creation of a transaction object used
1172 1184 # for the whole processing scope.
1173 1185 op.gettransaction()
1174 1186 unpackerversion = inpart.params.get('version', '01')
1175 1187 # We should raise an appropriate exception here
1176 1188 unpacker = changegroup.packermap[unpackerversion][1]
1177 1189 cg = unpacker(inpart, None)
1178 1190 # the source and url passed here are overwritten by the one contained in
1179 1191 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1180 1192 nbchangesets = None
1181 1193 if 'nbchanges' in inpart.params:
1182 1194 nbchangesets = int(inpart.params.get('nbchanges'))
1183 1195 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2',
1184 1196 expectedtotal=nbchangesets)
1185 1197 op.records.add('changegroup', {'return': ret})
1186 1198 if op.reply is not None:
1187 1199 # This is definitely not the final form of this
1188 1200 # return. But one need to start somewhere.
1189 1201 part = op.reply.newpart('reply:changegroup', mandatory=False)
1190 1202 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1191 1203 part.addparam('return', '%i' % ret, mandatory=False)
1192 1204 assert not inpart.read()
1193 1205
1194 1206 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1195 1207 ['digest:%s' % k for k in util.DIGESTS.keys()])
1196 1208 @parthandler('remote-changegroup', _remotechangegroupparams)
1197 1209 def handleremotechangegroup(op, inpart):
1198 1210 """apply a bundle10 on the repo, given an url and validation information
1199 1211
1200 1212 All the information about the remote bundle to import are given as
1201 1213 parameters. The parameters include:
1202 1214 - url: the url to the bundle10.
1203 1215 - size: the bundle10 file size. It is used to validate what was
1204 1216 retrieved by the client matches the server knowledge about the bundle.
1205 1217 - digests: a space separated list of the digest types provided as
1206 1218 parameters.
1207 1219 - digest:<digest-type>: the hexadecimal representation of the digest with
1208 1220 that name. Like the size, it is used to validate what was retrieved by
1209 1221 the client matches what the server knows about the bundle.
1210 1222
1211 1223 When multiple digest types are given, all of them are checked.
1212 1224 """
1213 1225 try:
1214 1226 raw_url = inpart.params['url']
1215 1227 except KeyError:
1216 1228 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1217 1229 parsed_url = util.url(raw_url)
1218 1230 if parsed_url.scheme not in capabilities['remote-changegroup']:
1219 1231 raise util.Abort(_('remote-changegroup does not support %s urls') %
1220 1232 parsed_url.scheme)
1221 1233
1222 1234 try:
1223 1235 size = int(inpart.params['size'])
1224 1236 except ValueError:
1225 1237 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1226 1238 % 'size')
1227 1239 except KeyError:
1228 1240 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1229 1241
1230 1242 digests = {}
1231 1243 for typ in inpart.params.get('digests', '').split():
1232 1244 param = 'digest:%s' % typ
1233 1245 try:
1234 1246 value = inpart.params[param]
1235 1247 except KeyError:
1236 1248 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1237 1249 param)
1238 1250 digests[typ] = value
1239 1251
1240 1252 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1241 1253
1242 1254 # Make sure we trigger a transaction creation
1243 1255 #
1244 1256 # The addchangegroup function will get a transaction object by itself, but
1245 1257 # we need to make sure we trigger the creation of a transaction object used
1246 1258 # for the whole processing scope.
1247 1259 op.gettransaction()
1248 1260 from . import exchange
1249 1261 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1250 1262 if not isinstance(cg, changegroup.cg1unpacker):
1251 1263 raise util.Abort(_('%s: not a bundle version 1.0') %
1252 1264 util.hidepassword(raw_url))
1253 1265 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1254 1266 op.records.add('changegroup', {'return': ret})
1255 1267 if op.reply is not None:
1256 1268 # This is definitely not the final form of this
1257 1269 # return. But one need to start somewhere.
1258 1270 part = op.reply.newpart('reply:changegroup')
1259 1271 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1260 1272 part.addparam('return', '%i' % ret, mandatory=False)
1261 1273 try:
1262 1274 real_part.validate()
1263 1275 except util.Abort as e:
1264 1276 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1265 1277 (util.hidepassword(raw_url), str(e)))
1266 1278 assert not inpart.read()
1267 1279
1268 1280 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1269 1281 def handlereplychangegroup(op, inpart):
1270 1282 ret = int(inpart.params['return'])
1271 1283 replyto = int(inpart.params['in-reply-to'])
1272 1284 op.records.add('changegroup', {'return': ret}, replyto)
1273 1285
1274 1286 @parthandler('check:heads')
1275 1287 def handlecheckheads(op, inpart):
1276 1288 """check that head of the repo did not change
1277 1289
1278 1290 This is used to detect a push race when using unbundle.
1279 1291 This replaces the "heads" argument of unbundle."""
1280 1292 h = inpart.read(20)
1281 1293 heads = []
1282 1294 while len(h) == 20:
1283 1295 heads.append(h)
1284 1296 h = inpart.read(20)
1285 1297 assert not h
1286 1298 if heads != op.repo.heads():
1287 1299 raise error.PushRaced('repository changed while pushing - '
1288 1300 'please try again')
1289 1301
1290 1302 @parthandler('output')
1291 1303 def handleoutput(op, inpart):
1292 1304 """forward output captured on the server to the client"""
1293 1305 for line in inpart.read().splitlines():
1294 1306 op.ui.status(('remote: %s\n' % line))
1295 1307
1296 1308 @parthandler('replycaps')
1297 1309 def handlereplycaps(op, inpart):
1298 1310 """Notify that a reply bundle should be created
1299 1311
1300 1312 The payload contains the capabilities information for the reply"""
1301 1313 caps = decodecaps(inpart.read())
1302 1314 if op.reply is None:
1303 1315 op.reply = bundle20(op.ui, caps)
1304 1316
1305 1317 @parthandler('error:abort', ('message', 'hint'))
1306 1318 def handleerrorabort(op, inpart):
1307 1319 """Used to transmit abort error over the wire"""
1308 1320 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1309 1321
1310 1322 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1311 1323 'in-reply-to'))
1312 1324 def handleerrorpushkey(op, inpart):
1313 1325 """Used to transmit failure of a mandatory pushkey over the wire"""
1314 1326 kwargs = {}
1315 1327 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1316 1328 value = inpart.params.get(name)
1317 1329 if value is not None:
1318 1330 kwargs[name] = value
1319 1331 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1320 1332
1321 1333 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1322 1334 def handleerrorunsupportedcontent(op, inpart):
1323 1335 """Used to transmit unknown content error over the wire"""
1324 1336 kwargs = {}
1325 1337 parttype = inpart.params.get('parttype')
1326 1338 if parttype is not None:
1327 1339 kwargs['parttype'] = parttype
1328 1340 params = inpart.params.get('params')
1329 1341 if params is not None:
1330 1342 kwargs['params'] = params.split('\0')
1331 1343
1332 1344 raise error.BundleUnknownFeatureError(**kwargs)
1333 1345
1334 1346 @parthandler('error:pushraced', ('message',))
1335 1347 def handleerrorpushraced(op, inpart):
1336 1348 """Used to transmit push race error over the wire"""
1337 1349 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1338 1350
1339 1351 @parthandler('listkeys', ('namespace',))
1340 1352 def handlelistkeys(op, inpart):
1341 1353 """retrieve pushkey namespace content stored in a bundle2"""
1342 1354 namespace = inpart.params['namespace']
1343 1355 r = pushkey.decodekeys(inpart.read())
1344 1356 op.records.add('listkeys', (namespace, r))
1345 1357
1346 1358 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1347 1359 def handlepushkey(op, inpart):
1348 1360 """process a pushkey request"""
1349 1361 dec = pushkey.decode
1350 1362 namespace = dec(inpart.params['namespace'])
1351 1363 key = dec(inpart.params['key'])
1352 1364 old = dec(inpart.params['old'])
1353 1365 new = dec(inpart.params['new'])
1354 1366 ret = op.repo.pushkey(namespace, key, old, new)
1355 1367 record = {'namespace': namespace,
1356 1368 'key': key,
1357 1369 'old': old,
1358 1370 'new': new}
1359 1371 op.records.add('pushkey', record)
1360 1372 if op.reply is not None:
1361 1373 rpart = op.reply.newpart('reply:pushkey')
1362 1374 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1363 1375 rpart.addparam('return', '%i' % ret, mandatory=False)
1364 1376 if inpart.mandatory and not ret:
1365 1377 kwargs = {}
1366 1378 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1367 1379 if key in inpart.params:
1368 1380 kwargs[key] = inpart.params[key]
1369 1381 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1370 1382
1371 1383 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1372 1384 def handlepushkeyreply(op, inpart):
1373 1385 """retrieve the result of a pushkey request"""
1374 1386 ret = int(inpart.params['return'])
1375 1387 partid = int(inpart.params['in-reply-to'])
1376 1388 op.records.add('pushkey', {'return': ret}, partid)
1377 1389
1378 1390 @parthandler('obsmarkers')
1379 1391 def handleobsmarker(op, inpart):
1380 1392 """add a stream of obsmarkers to the repo"""
1381 1393 tr = op.gettransaction()
1382 1394 markerdata = inpart.read()
1383 1395 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1384 1396 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1385 1397 % len(markerdata))
1386 1398 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1387 1399 if new:
1388 1400 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1389 1401 op.records.add('obsmarkers', {'new': new})
1390 1402 if op.reply is not None:
1391 1403 rpart = op.reply.newpart('reply:obsmarkers')
1392 1404 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1393 1405 rpart.addparam('new', '%i' % new, mandatory=False)
1394 1406
1395 1407
1396 1408 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1397 1409 def handleobsmarkerreply(op, inpart):
1398 1410 """retrieve the result of a pushkey request"""
1399 1411 ret = int(inpart.params['new'])
1400 1412 partid = int(inpart.params['in-reply-to'])
1401 1413 op.records.add('obsmarkers', {'new': ret}, partid)
1402 1414
1403 1415 @parthandler('hgtagsfnodes')
1404 1416 def handlehgtagsfnodes(op, inpart):
1405 1417 """Applies .hgtags fnodes cache entries to the local repo.
1406 1418
1407 1419 Payload is pairs of 20 byte changeset nodes and filenodes.
1408 1420 """
1409 1421 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1410 1422
1411 1423 count = 0
1412 1424 while True:
1413 1425 node = inpart.read(20)
1414 1426 fnode = inpart.read(20)
1415 1427 if len(node) < 20 or len(fnode) < 20:
1416 1428 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1417 1429 break
1418 1430 cache.setfnode(node, fnode)
1419 1431 count += 1
1420 1432
1421 1433 cache.write()
1422 1434 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now