##// END OF EJS Templates
bundle2: clarify in docstring that header size is for a single header...
Martin von Zweigbergk -
r25507:5bee4837 default
parent child Browse files
Show More
@@ -1,1406 +1,1406 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import errno
149 149 import sys
150 150 import util
151 151 import struct
152 152 import urllib
153 153 import string
154 154 import obsolete
155 155 import pushkey
156 156 import url
157 157 import re
158 158
159 159 import changegroup, error, tags
160 160 from i18n import _
161 161
162 162 _pack = struct.pack
163 163 _unpack = struct.unpack
164 164
165 165 _fstreamparamsize = '>i'
166 166 _fpartheadersize = '>i'
167 167 _fparttypesize = '>B'
168 168 _fpartid = '>I'
169 169 _fpayloadsize = '>i'
170 170 _fpartparamcount = '>BB'
171 171
172 172 preferedchunksize = 4096
173 173
174 174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175 175
176 176 def outdebug(ui, message):
177 177 """debug regarding output stream (bundling)"""
178 178 if ui.configbool('devel', 'bundle2.debug', False):
179 179 ui.debug('bundle2-output: %s\n' % message)
180 180
181 181 def indebug(ui, message):
182 182 """debug on input stream (unbundling)"""
183 183 if ui.configbool('devel', 'bundle2.debug', False):
184 184 ui.debug('bundle2-input: %s\n' % message)
185 185
186 186 def validateparttype(parttype):
187 187 """raise ValueError if a parttype contains invalid character"""
188 188 if _parttypeforbidden.search(parttype):
189 189 raise ValueError(parttype)
190 190
191 191 def _makefpartparamsizes(nbparams):
192 192 """return a struct format to read part parameter sizes
193 193
194 194 The number parameters is variable so we need to build that format
195 195 dynamically.
196 196 """
197 197 return '>'+('BB'*nbparams)
198 198
199 199 parthandlermapping = {}
200 200
201 201 def parthandler(parttype, params=()):
202 202 """decorator that register a function as a bundle2 part handler
203 203
204 204 eg::
205 205
206 206 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
207 207 def myparttypehandler(...):
208 208 '''process a part of type "my part".'''
209 209 ...
210 210 """
211 211 validateparttype(parttype)
212 212 def _decorator(func):
213 213 lparttype = parttype.lower() # enforce lower case matching.
214 214 assert lparttype not in parthandlermapping
215 215 parthandlermapping[lparttype] = func
216 216 func.params = frozenset(params)
217 217 return func
218 218 return _decorator
219 219
220 220 class unbundlerecords(object):
221 221 """keep record of what happens during and unbundle
222 222
223 223 New records are added using `records.add('cat', obj)`. Where 'cat' is a
224 224 category of record and obj is an arbitrary object.
225 225
226 226 `records['cat']` will return all entries of this category 'cat'.
227 227
228 228 Iterating on the object itself will yield `('category', obj)` tuples
229 229 for all entries.
230 230
231 231 All iterations happens in chronological order.
232 232 """
233 233
234 234 def __init__(self):
235 235 self._categories = {}
236 236 self._sequences = []
237 237 self._replies = {}
238 238
239 239 def add(self, category, entry, inreplyto=None):
240 240 """add a new record of a given category.
241 241
242 242 The entry can then be retrieved in the list returned by
243 243 self['category']."""
244 244 self._categories.setdefault(category, []).append(entry)
245 245 self._sequences.append((category, entry))
246 246 if inreplyto is not None:
247 247 self.getreplies(inreplyto).add(category, entry)
248 248
249 249 def getreplies(self, partid):
250 250 """get the records that are replies to a specific part"""
251 251 return self._replies.setdefault(partid, unbundlerecords())
252 252
253 253 def __getitem__(self, cat):
254 254 return tuple(self._categories.get(cat, ()))
255 255
256 256 def __iter__(self):
257 257 return iter(self._sequences)
258 258
259 259 def __len__(self):
260 260 return len(self._sequences)
261 261
262 262 def __nonzero__(self):
263 263 return bool(self._sequences)
264 264
265 265 class bundleoperation(object):
266 266 """an object that represents a single bundling process
267 267
268 268 Its purpose is to carry unbundle-related objects and states.
269 269
270 270 A new object should be created at the beginning of each bundle processing.
271 271 The object is to be returned by the processing function.
272 272
273 273 The object has very little content now it will ultimately contain:
274 274 * an access to the repo the bundle is applied to,
275 275 * a ui object,
276 276 * a way to retrieve a transaction to add changes to the repo,
277 277 * a way to record the result of processing each part,
278 278 * a way to construct a bundle response when applicable.
279 279 """
280 280
281 281 def __init__(self, repo, transactiongetter, captureoutput=True):
282 282 self.repo = repo
283 283 self.ui = repo.ui
284 284 self.records = unbundlerecords()
285 285 self.gettransaction = transactiongetter
286 286 self.reply = None
287 287 self.captureoutput = captureoutput
288 288
289 289 class TransactionUnavailable(RuntimeError):
290 290 pass
291 291
292 292 def _notransaction():
293 293 """default method to get a transaction while processing a bundle
294 294
295 295 Raise an exception to highlight the fact that no transaction was expected
296 296 to be created"""
297 297 raise TransactionUnavailable()
298 298
299 299 def processbundle(repo, unbundler, transactiongetter=None, op=None):
300 300 """This function process a bundle, apply effect to/from a repo
301 301
302 302 It iterates over each part then searches for and uses the proper handling
303 303 code to process the part. Parts are processed in order.
304 304
305 305 This is very early version of this function that will be strongly reworked
306 306 before final usage.
307 307
308 308 Unknown Mandatory part will abort the process.
309 309
310 310 It is temporarily possible to provide a prebuilt bundleoperation to the
311 311 function. This is used to ensure output is properly propagated in case of
312 312 an error during the unbundling. This output capturing part will likely be
313 313 reworked and this ability will probably go away in the process.
314 314 """
315 315 if op is None:
316 316 if transactiongetter is None:
317 317 transactiongetter = _notransaction
318 318 op = bundleoperation(repo, transactiongetter)
319 319 # todo:
320 320 # - replace this is a init function soon.
321 321 # - exception catching
322 322 unbundler.params
323 323 if repo.ui.debugflag:
324 324 msg = ['bundle2-input-bundle:']
325 325 if unbundler.params:
326 326 msg.append(' %i params')
327 327 if op.gettransaction is None:
328 328 msg.append(' no-transaction')
329 329 else:
330 330 msg.append(' with-transaction')
331 331 msg.append('\n')
332 332 repo.ui.debug(''.join(msg))
333 333 iterparts = enumerate(unbundler.iterparts())
334 334 part = None
335 335 nbpart = 0
336 336 try:
337 337 for nbpart, part in iterparts:
338 338 _processpart(op, part)
339 339 except BaseException, exc:
340 340 for nbpart, part in iterparts:
341 341 # consume the bundle content
342 342 part.seek(0, 2)
343 343 # Small hack to let caller code distinguish exceptions from bundle2
344 344 # processing from processing the old format. This is mostly
345 345 # needed to handle different return codes to unbundle according to the
346 346 # type of bundle. We should probably clean up or drop this return code
347 347 # craziness in a future version.
348 348 exc.duringunbundle2 = True
349 349 salvaged = []
350 350 replycaps = None
351 351 if op.reply is not None:
352 352 salvaged = op.reply.salvageoutput()
353 353 replycaps = op.reply.capabilities
354 354 exc._replycaps = replycaps
355 355 exc._bundle2salvagedoutput = salvaged
356 356 raise
357 357 finally:
358 358 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
359 359
360 360 return op
361 361
362 362 def _processpart(op, part):
363 363 """process a single part from a bundle
364 364
365 365 The part is guaranteed to have been fully consumed when the function exits
366 366 (even if an exception is raised)."""
367 367 status = 'unknown' # used by debug output
368 368 try:
369 369 try:
370 370 handler = parthandlermapping.get(part.type)
371 371 if handler is None:
372 372 status = 'unsupported-type'
373 373 raise error.UnsupportedPartError(parttype=part.type)
374 374 indebug(op.ui, 'found a handler for part %r' % part.type)
375 375 unknownparams = part.mandatorykeys - handler.params
376 376 if unknownparams:
377 377 unknownparams = list(unknownparams)
378 378 unknownparams.sort()
379 379 status = 'unsupported-params (%s)' % unknownparams
380 380 raise error.UnsupportedPartError(parttype=part.type,
381 381 params=unknownparams)
382 382 status = 'supported'
383 383 except error.UnsupportedPartError, exc:
384 384 if part.mandatory: # mandatory parts
385 385 raise
386 386 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
387 387 return # skip to part processing
388 388 finally:
389 389 if op.ui.debugflag:
390 390 msg = ['bundle2-input-part: "%s"' % part.type]
391 391 if not part.mandatory:
392 392 msg.append(' (advisory)')
393 393 nbmp = len(part.mandatorykeys)
394 394 nbap = len(part.params) - nbmp
395 395 if nbmp or nbap:
396 396 msg.append(' (params:')
397 397 if nbmp:
398 398 msg.append(' %i mandatory' % nbmp)
399 399 if nbap:
400 400 msg.append(' %i advisory' % nbmp)
401 401 msg.append(')')
402 402 msg.append(' %s\n' % status)
403 403 op.ui.debug(''.join(msg))
404 404
405 405 # handler is called outside the above try block so that we don't
406 406 # risk catching KeyErrors from anything other than the
407 407 # parthandlermapping lookup (any KeyError raised by handler()
408 408 # itself represents a defect of a different variety).
409 409 output = None
410 410 if op.captureoutput and op.reply is not None:
411 411 op.ui.pushbuffer(error=True, subproc=True)
412 412 output = ''
413 413 try:
414 414 handler(op, part)
415 415 finally:
416 416 if output is not None:
417 417 output = op.ui.popbuffer()
418 418 if output:
419 419 outpart = op.reply.newpart('output', data=output,
420 420 mandatory=False)
421 421 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
422 422 finally:
423 423 # consume the part content to not corrupt the stream.
424 424 part.seek(0, 2)
425 425
426 426
427 427 def decodecaps(blob):
428 428 """decode a bundle2 caps bytes blob into a dictionary
429 429
430 430 The blob is a list of capabilities (one per line)
431 431 Capabilities may have values using a line of the form::
432 432
433 433 capability=value1,value2,value3
434 434
435 435 The values are always a list."""
436 436 caps = {}
437 437 for line in blob.splitlines():
438 438 if not line:
439 439 continue
440 440 if '=' not in line:
441 441 key, vals = line, ()
442 442 else:
443 443 key, vals = line.split('=', 1)
444 444 vals = vals.split(',')
445 445 key = urllib.unquote(key)
446 446 vals = [urllib.unquote(v) for v in vals]
447 447 caps[key] = vals
448 448 return caps
449 449
450 450 def encodecaps(caps):
451 451 """encode a bundle2 caps dictionary into a bytes blob"""
452 452 chunks = []
453 453 for ca in sorted(caps):
454 454 vals = caps[ca]
455 455 ca = urllib.quote(ca)
456 456 vals = [urllib.quote(v) for v in vals]
457 457 if vals:
458 458 ca = "%s=%s" % (ca, ','.join(vals))
459 459 chunks.append(ca)
460 460 return '\n'.join(chunks)
461 461
462 462 class bundle20(object):
463 463 """represent an outgoing bundle2 container
464 464
465 465 Use the `addparam` method to add stream level parameter. and `newpart` to
466 466 populate it. Then call `getchunks` to retrieve all the binary chunks of
467 467 data that compose the bundle2 container."""
468 468
469 469 _magicstring = 'HG20'
470 470
471 471 def __init__(self, ui, capabilities=()):
472 472 self.ui = ui
473 473 self._params = []
474 474 self._parts = []
475 475 self.capabilities = dict(capabilities)
476 476
477 477 @property
478 478 def nbparts(self):
479 479 """total number of parts added to the bundler"""
480 480 return len(self._parts)
481 481
482 482 # methods used to defines the bundle2 content
483 483 def addparam(self, name, value=None):
484 484 """add a stream level parameter"""
485 485 if not name:
486 486 raise ValueError('empty parameter name')
487 487 if name[0] not in string.letters:
488 488 raise ValueError('non letter first character: %r' % name)
489 489 self._params.append((name, value))
490 490
491 491 def addpart(self, part):
492 492 """add a new part to the bundle2 container
493 493
494 494 Parts contains the actual applicative payload."""
495 495 assert part.id is None
496 496 part.id = len(self._parts) # very cheap counter
497 497 self._parts.append(part)
498 498
499 499 def newpart(self, typeid, *args, **kwargs):
500 500 """create a new part and add it to the containers
501 501
502 502 As the part is directly added to the containers. For now, this means
503 503 that any failure to properly initialize the part after calling
504 504 ``newpart`` should result in a failure of the whole bundling process.
505 505
506 506 You can still fall back to manually create and add if you need better
507 507 control."""
508 508 part = bundlepart(typeid, *args, **kwargs)
509 509 self.addpart(part)
510 510 return part
511 511
512 512 # methods used to generate the bundle2 stream
513 513 def getchunks(self):
514 514 if self.ui.debugflag:
515 515 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
516 516 if self._params:
517 517 msg.append(' (%i params)' % len(self._params))
518 518 msg.append(' %i parts total\n' % len(self._parts))
519 519 self.ui.debug(''.join(msg))
520 520 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
521 521 yield self._magicstring
522 522 param = self._paramchunk()
523 523 outdebug(self.ui, 'bundle parameter: %s' % param)
524 524 yield _pack(_fstreamparamsize, len(param))
525 525 if param:
526 526 yield param
527 527
528 528 outdebug(self.ui, 'start of parts')
529 529 for part in self._parts:
530 530 outdebug(self.ui, 'bundle part: "%s"' % part.type)
531 531 for chunk in part.getchunks(ui=self.ui):
532 532 yield chunk
533 533 outdebug(self.ui, 'end of bundle')
534 534 yield _pack(_fpartheadersize, 0)
535 535
536 536 def _paramchunk(self):
537 537 """return a encoded version of all stream parameters"""
538 538 blocks = []
539 539 for par, value in self._params:
540 540 par = urllib.quote(par)
541 541 if value is not None:
542 542 value = urllib.quote(value)
543 543 par = '%s=%s' % (par, value)
544 544 blocks.append(par)
545 545 return ' '.join(blocks)
546 546
547 547 def salvageoutput(self):
548 548 """return a list with a copy of all output parts in the bundle
549 549
550 550 This is meant to be used during error handling to make sure we preserve
551 551 server output"""
552 552 salvaged = []
553 553 for part in self._parts:
554 554 if part.type.startswith('output'):
555 555 salvaged.append(part.copy())
556 556 return salvaged
557 557
558 558
559 559 class unpackermixin(object):
560 560 """A mixin to extract bytes and struct data from a stream"""
561 561
562 562 def __init__(self, fp):
563 563 self._fp = fp
564 564 self._seekable = (util.safehasattr(fp, 'seek') and
565 565 util.safehasattr(fp, 'tell'))
566 566
567 567 def _unpack(self, format):
568 568 """unpack this struct format from the stream"""
569 569 data = self._readexact(struct.calcsize(format))
570 570 return _unpack(format, data)
571 571
572 572 def _readexact(self, size):
573 573 """read exactly <size> bytes from the stream"""
574 574 return changegroup.readexactly(self._fp, size)
575 575
576 576 def seek(self, offset, whence=0):
577 577 """move the underlying file pointer"""
578 578 if self._seekable:
579 579 return self._fp.seek(offset, whence)
580 580 else:
581 581 raise NotImplementedError(_('File pointer is not seekable'))
582 582
583 583 def tell(self):
584 584 """return the file offset, or None if file is not seekable"""
585 585 if self._seekable:
586 586 try:
587 587 return self._fp.tell()
588 588 except IOError, e:
589 589 if e.errno == errno.ESPIPE:
590 590 self._seekable = False
591 591 else:
592 592 raise
593 593 return None
594 594
595 595 def close(self):
596 596 """close underlying file"""
597 597 if util.safehasattr(self._fp, 'close'):
598 598 return self._fp.close()
599 599
600 600 def getunbundler(ui, fp, header=None):
601 601 """return a valid unbundler object for a given header"""
602 602 if header is None:
603 603 header = changegroup.readexactly(fp, 4)
604 604 magic, version = header[0:2], header[2:4]
605 605 if magic != 'HG':
606 606 raise util.Abort(_('not a Mercurial bundle'))
607 607 unbundlerclass = formatmap.get(version)
608 608 if unbundlerclass is None:
609 609 raise util.Abort(_('unknown bundle version %s') % version)
610 610 unbundler = unbundlerclass(ui, fp)
611 611 indebug(ui, 'start processing of %s stream' % header)
612 612 return unbundler
613 613
614 614 class unbundle20(unpackermixin):
615 615 """interpret a bundle2 stream
616 616
617 617 This class is fed with a binary stream and yields parts through its
618 618 `iterparts` methods."""
619 619
620 620 def __init__(self, ui, fp):
621 621 """If header is specified, we do not read it out of the stream."""
622 622 self.ui = ui
623 623 super(unbundle20, self).__init__(fp)
624 624
625 625 @util.propertycache
626 626 def params(self):
627 627 """dictionary of stream level parameters"""
628 628 indebug(self.ui, 'reading bundle2 stream parameters')
629 629 params = {}
630 630 paramssize = self._unpack(_fstreamparamsize)[0]
631 631 if paramssize < 0:
632 632 raise error.BundleValueError('negative bundle param size: %i'
633 633 % paramssize)
634 634 if paramssize:
635 635 for p in self._readexact(paramssize).split(' '):
636 636 p = p.split('=', 1)
637 637 p = [urllib.unquote(i) for i in p]
638 638 if len(p) < 2:
639 639 p.append(None)
640 640 self._processparam(*p)
641 641 params[p[0]] = p[1]
642 642 return params
643 643
644 644 def _processparam(self, name, value):
645 645 """process a parameter, applying its effect if needed
646 646
647 647 Parameter starting with a lower case letter are advisory and will be
648 648 ignored when unknown. Those starting with an upper case letter are
649 649 mandatory and will this function will raise a KeyError when unknown.
650 650
651 651 Note: no option are currently supported. Any input will be either
652 652 ignored or failing.
653 653 """
654 654 if not name:
655 655 raise ValueError('empty parameter name')
656 656 if name[0] not in string.letters:
657 657 raise ValueError('non letter first character: %r' % name)
658 658 # Some logic will be later added here to try to process the option for
659 659 # a dict of known parameter.
660 660 if name[0].islower():
661 661 indebug(self.ui, "ignoring unknown parameter %r" % name)
662 662 else:
663 663 raise error.UnsupportedPartError(params=(name,))
664 664
665 665
666 666 def iterparts(self):
667 667 """yield all parts contained in the stream"""
668 668 # make sure param have been loaded
669 669 self.params
670 670 indebug(self.ui, 'start extraction of bundle2 parts')
671 671 headerblock = self._readpartheader()
672 672 while headerblock is not None:
673 673 part = unbundlepart(self.ui, headerblock, self._fp)
674 674 yield part
675 675 part.seek(0, 2)
676 676 headerblock = self._readpartheader()
677 677 indebug(self.ui, 'end of bundle2 stream')
678 678
679 679 def _readpartheader(self):
680 680 """reads a part header size and return the bytes blob
681 681
682 682 returns None if empty"""
683 683 headersize = self._unpack(_fpartheadersize)[0]
684 684 if headersize < 0:
685 685 raise error.BundleValueError('negative part header size: %i'
686 686 % headersize)
687 687 indebug(self.ui, 'part header size: %i' % headersize)
688 688 if headersize:
689 689 return self._readexact(headersize)
690 690 return None
691 691
692 692 def compressed(self):
693 693 return False
694 694
695 695 formatmap = {'20': unbundle20}
696 696
697 697 class bundlepart(object):
698 698 """A bundle2 part contains application level payload
699 699
700 700 The part `type` is used to route the part to the application level
701 701 handler.
702 702
703 703 The part payload is contained in ``part.data``. It could be raw bytes or a
704 704 generator of byte chunks.
705 705
706 706 You can add parameters to the part using the ``addparam`` method.
707 707 Parameters can be either mandatory (default) or advisory. Remote side
708 708 should be able to safely ignore the advisory ones.
709 709
710 710 Both data and parameters cannot be modified after the generation has begun.
711 711 """
712 712
713 713 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
714 714 data='', mandatory=True):
715 715 validateparttype(parttype)
716 716 self.id = None
717 717 self.type = parttype
718 718 self._data = data
719 719 self._mandatoryparams = list(mandatoryparams)
720 720 self._advisoryparams = list(advisoryparams)
721 721 # checking for duplicated entries
722 722 self._seenparams = set()
723 723 for pname, __ in self._mandatoryparams + self._advisoryparams:
724 724 if pname in self._seenparams:
725 725 raise RuntimeError('duplicated params: %s' % pname)
726 726 self._seenparams.add(pname)
727 727 # status of the part's generation:
728 728 # - None: not started,
729 729 # - False: currently generated,
730 730 # - True: generation done.
731 731 self._generated = None
732 732 self.mandatory = mandatory
733 733
734 734 def copy(self):
735 735 """return a copy of the part
736 736
737 737 The new part have the very same content but no partid assigned yet.
738 738 Parts with generated data cannot be copied."""
739 739 assert not util.safehasattr(self.data, 'next')
740 740 return self.__class__(self.type, self._mandatoryparams,
741 741 self._advisoryparams, self._data, self.mandatory)
742 742
743 743 # methods used to defines the part content
744 744 def __setdata(self, data):
745 745 if self._generated is not None:
746 746 raise error.ReadOnlyPartError('part is being generated')
747 747 self._data = data
748 748 def __getdata(self):
749 749 return self._data
750 750 data = property(__getdata, __setdata)
751 751
752 752 @property
753 753 def mandatoryparams(self):
754 754 # make it an immutable tuple to force people through ``addparam``
755 755 return tuple(self._mandatoryparams)
756 756
757 757 @property
758 758 def advisoryparams(self):
759 759 # make it an immutable tuple to force people through ``addparam``
760 760 return tuple(self._advisoryparams)
761 761
762 762 def addparam(self, name, value='', mandatory=True):
763 763 if self._generated is not None:
764 764 raise error.ReadOnlyPartError('part is being generated')
765 765 if name in self._seenparams:
766 766 raise ValueError('duplicated params: %s' % name)
767 767 self._seenparams.add(name)
768 768 params = self._advisoryparams
769 769 if mandatory:
770 770 params = self._mandatoryparams
771 771 params.append((name, value))
772 772
773 773 # methods used to generates the bundle2 stream
774 774 def getchunks(self, ui):
775 775 if self._generated is not None:
776 776 raise RuntimeError('part can only be consumed once')
777 777 self._generated = False
778 778
779 779 if ui.debugflag:
780 780 msg = ['bundle2-output-part: "%s"' % self.type]
781 781 if not self.mandatory:
782 782 msg.append(' (advisory)')
783 783 nbmp = len(self.mandatoryparams)
784 784 nbap = len(self.advisoryparams)
785 785 if nbmp or nbap:
786 786 msg.append(' (params:')
787 787 if nbmp:
788 788 msg.append(' %i mandatory' % nbmp)
789 789 if nbap:
790 790 msg.append(' %i advisory' % nbmp)
791 791 msg.append(')')
792 792 if not self.data:
793 793 msg.append(' empty payload')
794 794 elif util.safehasattr(self.data, 'next'):
795 795 msg.append(' streamed payload')
796 796 else:
797 797 msg.append(' %i bytes payload' % len(self.data))
798 798 msg.append('\n')
799 799 ui.debug(''.join(msg))
800 800
801 801 #### header
802 802 if self.mandatory:
803 803 parttype = self.type.upper()
804 804 else:
805 805 parttype = self.type.lower()
806 806 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
807 807 ## parttype
808 808 header = [_pack(_fparttypesize, len(parttype)),
809 809 parttype, _pack(_fpartid, self.id),
810 810 ]
811 811 ## parameters
812 812 # count
813 813 manpar = self.mandatoryparams
814 814 advpar = self.advisoryparams
815 815 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
816 816 # size
817 817 parsizes = []
818 818 for key, value in manpar:
819 819 parsizes.append(len(key))
820 820 parsizes.append(len(value))
821 821 for key, value in advpar:
822 822 parsizes.append(len(key))
823 823 parsizes.append(len(value))
824 824 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
825 825 header.append(paramsizes)
826 826 # key, value
827 827 for key, value in manpar:
828 828 header.append(key)
829 829 header.append(value)
830 830 for key, value in advpar:
831 831 header.append(key)
832 832 header.append(value)
833 833 ## finalize header
834 834 headerchunk = ''.join(header)
835 835 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
836 836 yield _pack(_fpartheadersize, len(headerchunk))
837 837 yield headerchunk
838 838 ## payload
839 839 try:
840 840 for chunk in self._payloadchunks():
841 841 outdebug(ui, 'payload chunk size: %i' % len(chunk))
842 842 yield _pack(_fpayloadsize, len(chunk))
843 843 yield chunk
844 844 except BaseException, exc:
845 845 # backup exception data for later
846 846 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
847 847 % exc)
848 848 exc_info = sys.exc_info()
849 849 msg = 'unexpected error: %s' % exc
850 850 interpart = bundlepart('error:abort', [('message', msg)],
851 851 mandatory=False)
852 852 interpart.id = 0
853 853 yield _pack(_fpayloadsize, -1)
854 854 for chunk in interpart.getchunks(ui=ui):
855 855 yield chunk
856 856 outdebug(ui, 'closing payload chunk')
857 857 # abort current part payload
858 858 yield _pack(_fpayloadsize, 0)
859 859 raise exc_info[0], exc_info[1], exc_info[2]
860 860 # end of payload
861 861 outdebug(ui, 'closing payload chunk')
862 862 yield _pack(_fpayloadsize, 0)
863 863 self._generated = True
864 864
865 865 def _payloadchunks(self):
866 866 """yield chunks of a the part payload
867 867
868 868 Exists to handle the different methods to provide data to a part."""
869 869 # we only support fixed size data now.
870 870 # This will be improved in the future.
871 871 if util.safehasattr(self.data, 'next'):
872 872 buff = util.chunkbuffer(self.data)
873 873 chunk = buff.read(preferedchunksize)
874 874 while chunk:
875 875 yield chunk
876 876 chunk = buff.read(preferedchunksize)
877 877 elif len(self.data):
878 878 yield self.data
879 879
880 880
881 881 flaginterrupt = -1
882 882
883 883 class interrupthandler(unpackermixin):
884 884 """read one part and process it with restricted capability
885 885
886 886 This allows to transmit exception raised on the producer size during part
887 887 iteration while the consumer is reading a part.
888 888
889 889 Part processed in this manner only have access to a ui object,"""
890 890
891 891 def __init__(self, ui, fp):
892 892 super(interrupthandler, self).__init__(fp)
893 893 self.ui = ui
894 894
895 895 def _readpartheader(self):
896 896 """reads a part header size and return the bytes blob
897 897
898 898 returns None if empty"""
899 899 headersize = self._unpack(_fpartheadersize)[0]
900 900 if headersize < 0:
901 901 raise error.BundleValueError('negative part header size: %i'
902 902 % headersize)
903 903 indebug(self.ui, 'part header size: %i\n' % headersize)
904 904 if headersize:
905 905 return self._readexact(headersize)
906 906 return None
907 907
908 908 def __call__(self):
909 909
910 910 self.ui.debug('bundle2-input-stream-interrupt:'
911 911 ' opening out of band context\n')
912 912 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
913 913 headerblock = self._readpartheader()
914 914 if headerblock is None:
915 915 indebug(self.ui, 'no part found during interruption.')
916 916 return
917 917 part = unbundlepart(self.ui, headerblock, self._fp)
918 918 op = interruptoperation(self.ui)
919 919 _processpart(op, part)
920 920 self.ui.debug('bundle2-input-stream-interrupt:'
921 921 ' closing out of band context\n')
922 922
923 923 class interruptoperation(object):
924 924 """A limited operation to be use by part handler during interruption
925 925
926 926 It only have access to an ui object.
927 927 """
928 928
929 929 def __init__(self, ui):
930 930 self.ui = ui
931 931 self.reply = None
932 932 self.captureoutput = False
933 933
934 934 @property
935 935 def repo(self):
936 936 raise RuntimeError('no repo access from stream interruption')
937 937
938 938 def gettransaction(self):
939 939 raise TransactionUnavailable('no repo access from stream interruption')
940 940
941 941 class unbundlepart(unpackermixin):
942 942 """a bundle part read from a bundle"""
943 943
944 944 def __init__(self, ui, header, fp):
945 945 super(unbundlepart, self).__init__(fp)
946 946 self.ui = ui
947 947 # unbundle state attr
948 948 self._headerdata = header
949 949 self._headeroffset = 0
950 950 self._initialized = False
951 951 self.consumed = False
952 952 # part data
953 953 self.id = None
954 954 self.type = None
955 955 self.mandatoryparams = None
956 956 self.advisoryparams = None
957 957 self.params = None
958 958 self.mandatorykeys = ()
959 959 self._payloadstream = None
960 960 self._readheader()
961 961 self._mandatory = None
962 962 self._chunkindex = [] #(payload, file) position tuples for chunk starts
963 963 self._pos = 0
964 964
965 965 def _fromheader(self, size):
966 966 """return the next <size> byte from the header"""
967 967 offset = self._headeroffset
968 968 data = self._headerdata[offset:(offset + size)]
969 969 self._headeroffset = offset + size
970 970 return data
971 971
972 972 def _unpackheader(self, format):
973 973 """read given format from header
974 974
975 975 This automatically compute the size of the format to read."""
976 976 data = self._fromheader(struct.calcsize(format))
977 977 return _unpack(format, data)
978 978
979 979 def _initparams(self, mandatoryparams, advisoryparams):
980 980 """internal function to setup all logic related parameters"""
981 981 # make it read only to prevent people touching it by mistake.
982 982 self.mandatoryparams = tuple(mandatoryparams)
983 983 self.advisoryparams = tuple(advisoryparams)
984 984 # user friendly UI
985 985 self.params = dict(self.mandatoryparams)
986 986 self.params.update(dict(self.advisoryparams))
987 987 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
988 988
989 989 def _payloadchunks(self, chunknum=0):
990 990 '''seek to specified chunk and start yielding data'''
991 991 if len(self._chunkindex) == 0:
992 992 assert chunknum == 0, 'Must start with chunk 0'
993 993 self._chunkindex.append((0, super(unbundlepart, self).tell()))
994 994 else:
995 995 assert chunknum < len(self._chunkindex), \
996 996 'Unknown chunk %d' % chunknum
997 997 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
998 998
999 999 pos = self._chunkindex[chunknum][0]
1000 1000 payloadsize = self._unpack(_fpayloadsize)[0]
1001 1001 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1002 1002 while payloadsize:
1003 1003 if payloadsize == flaginterrupt:
1004 1004 # interruption detection, the handler will now read a
1005 1005 # single part and process it.
1006 1006 interrupthandler(self.ui, self._fp)()
1007 1007 elif payloadsize < 0:
1008 1008 msg = 'negative payload chunk size: %i' % payloadsize
1009 1009 raise error.BundleValueError(msg)
1010 1010 else:
1011 1011 result = self._readexact(payloadsize)
1012 1012 chunknum += 1
1013 1013 pos += payloadsize
1014 1014 if chunknum == len(self._chunkindex):
1015 1015 self._chunkindex.append((pos,
1016 1016 super(unbundlepart, self).tell()))
1017 1017 yield result
1018 1018 payloadsize = self._unpack(_fpayloadsize)[0]
1019 1019 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1020 1020
1021 1021 def _findchunk(self, pos):
1022 1022 '''for a given payload position, return a chunk number and offset'''
1023 1023 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1024 1024 if ppos == pos:
1025 1025 return chunk, 0
1026 1026 elif ppos > pos:
1027 1027 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1028 1028 raise ValueError('Unknown chunk')
1029 1029
1030 1030 def _readheader(self):
1031 1031 """read the header and setup the object"""
1032 1032 typesize = self._unpackheader(_fparttypesize)[0]
1033 1033 self.type = self._fromheader(typesize)
1034 1034 indebug(self.ui, 'part type: "%s"' % self.type)
1035 1035 self.id = self._unpackheader(_fpartid)[0]
1036 1036 indebug(self.ui, 'part id: "%s"' % self.id)
1037 1037 # extract mandatory bit from type
1038 1038 self.mandatory = (self.type != self.type.lower())
1039 1039 self.type = self.type.lower()
1040 1040 ## reading parameters
1041 1041 # param count
1042 1042 mancount, advcount = self._unpackheader(_fpartparamcount)
1043 1043 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1044 1044 # param size
1045 1045 fparamsizes = _makefpartparamsizes(mancount + advcount)
1046 1046 paramsizes = self._unpackheader(fparamsizes)
1047 1047 # make it a list of couple again
1048 1048 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1049 1049 # split mandatory from advisory
1050 1050 mansizes = paramsizes[:mancount]
1051 1051 advsizes = paramsizes[mancount:]
1052 1052 # retrieve param value
1053 1053 manparams = []
1054 1054 for key, value in mansizes:
1055 1055 manparams.append((self._fromheader(key), self._fromheader(value)))
1056 1056 advparams = []
1057 1057 for key, value in advsizes:
1058 1058 advparams.append((self._fromheader(key), self._fromheader(value)))
1059 1059 self._initparams(manparams, advparams)
1060 1060 ## part payload
1061 1061 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1062 1062 # we read the data, tell it
1063 1063 self._initialized = True
1064 1064
1065 1065 def read(self, size=None):
1066 1066 """read payload data"""
1067 1067 if not self._initialized:
1068 1068 self._readheader()
1069 1069 if size is None:
1070 1070 data = self._payloadstream.read()
1071 1071 else:
1072 1072 data = self._payloadstream.read(size)
1073 1073 self._pos += len(data)
1074 1074 if size is None or len(data) < size:
1075 1075 if not self.consumed and self._pos:
1076 1076 self.ui.debug('bundle2-input-part: total payload size %i\n'
1077 1077 % self._pos)
1078 1078 self.consumed = True
1079 1079 return data
1080 1080
1081 1081 def tell(self):
1082 1082 return self._pos
1083 1083
1084 1084 def seek(self, offset, whence=0):
1085 1085 if whence == 0:
1086 1086 newpos = offset
1087 1087 elif whence == 1:
1088 1088 newpos = self._pos + offset
1089 1089 elif whence == 2:
1090 1090 if not self.consumed:
1091 1091 self.read()
1092 1092 newpos = self._chunkindex[-1][0] - offset
1093 1093 else:
1094 1094 raise ValueError('Unknown whence value: %r' % (whence,))
1095 1095
1096 1096 if newpos > self._chunkindex[-1][0] and not self.consumed:
1097 1097 self.read()
1098 1098 if not 0 <= newpos <= self._chunkindex[-1][0]:
1099 1099 raise ValueError('Offset out of range')
1100 1100
1101 1101 if self._pos != newpos:
1102 1102 chunk, internaloffset = self._findchunk(newpos)
1103 1103 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1104 1104 adjust = self.read(internaloffset)
1105 1105 if len(adjust) != internaloffset:
1106 1106 raise util.Abort(_('Seek failed\n'))
1107 1107 self._pos = newpos
1108 1108
1109 1109 # These are only the static capabilities.
1110 1110 # Check the 'getrepocaps' function for the rest.
1111 1111 capabilities = {'HG20': (),
1112 1112 'error': ('abort', 'unsupportedcontent', 'pushraced',
1113 1113 'pushkey'),
1114 1114 'listkeys': (),
1115 1115 'pushkey': (),
1116 1116 'digests': tuple(sorted(util.DIGESTS.keys())),
1117 1117 'remote-changegroup': ('http', 'https'),
1118 1118 'hgtagsfnodes': (),
1119 1119 }
1120 1120
1121 1121 def getrepocaps(repo, allowpushback=False):
1122 1122 """return the bundle2 capabilities for a given repo
1123 1123
1124 1124 Exists to allow extensions (like evolution) to mutate the capabilities.
1125 1125 """
1126 1126 caps = capabilities.copy()
1127 1127 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1128 1128 if obsolete.isenabled(repo, obsolete.exchangeopt):
1129 1129 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1130 1130 caps['obsmarkers'] = supportedformat
1131 1131 if allowpushback:
1132 1132 caps['pushback'] = ()
1133 1133 return caps
1134 1134
1135 1135 def bundle2caps(remote):
1136 1136 """return the bundle capabilities of a peer as dict"""
1137 1137 raw = remote.capable('bundle2')
1138 1138 if not raw and raw != '':
1139 1139 return {}
1140 1140 capsblob = urllib.unquote(remote.capable('bundle2'))
1141 1141 return decodecaps(capsblob)
1142 1142
1143 1143 def obsmarkersversion(caps):
1144 1144 """extract the list of supported obsmarkers versions from a bundle2caps dict
1145 1145 """
1146 1146 obscaps = caps.get('obsmarkers', ())
1147 1147 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1148 1148
1149 1149 @parthandler('changegroup', ('version',))
1150 1150 def handlechangegroup(op, inpart):
1151 1151 """apply a changegroup part on the repo
1152 1152
1153 1153 This is a very early implementation that will massive rework before being
1154 1154 inflicted to any end-user.
1155 1155 """
1156 1156 # Make sure we trigger a transaction creation
1157 1157 #
1158 1158 # The addchangegroup function will get a transaction object by itself, but
1159 1159 # we need to make sure we trigger the creation of a transaction object used
1160 1160 # for the whole processing scope.
1161 1161 op.gettransaction()
1162 1162 unpackerversion = inpart.params.get('version', '01')
1163 1163 # We should raise an appropriate exception here
1164 1164 unpacker = changegroup.packermap[unpackerversion][1]
1165 1165 cg = unpacker(inpart, 'UN')
1166 1166 # the source and url passed here are overwritten by the one contained in
1167 1167 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1168 1168 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1169 1169 op.records.add('changegroup', {'return': ret})
1170 1170 if op.reply is not None:
1171 1171 # This is definitely not the final form of this
1172 1172 # return. But one need to start somewhere.
1173 1173 part = op.reply.newpart('reply:changegroup', mandatory=False)
1174 1174 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1175 1175 part.addparam('return', '%i' % ret, mandatory=False)
1176 1176 assert not inpart.read()
1177 1177
1178 1178 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1179 1179 ['digest:%s' % k for k in util.DIGESTS.keys()])
1180 1180 @parthandler('remote-changegroup', _remotechangegroupparams)
1181 1181 def handleremotechangegroup(op, inpart):
1182 1182 """apply a bundle10 on the repo, given an url and validation information
1183 1183
1184 1184 All the information about the remote bundle to import are given as
1185 1185 parameters. The parameters include:
1186 1186 - url: the url to the bundle10.
1187 1187 - size: the bundle10 file size. It is used to validate what was
1188 1188 retrieved by the client matches the server knowledge about the bundle.
1189 1189 - digests: a space separated list of the digest types provided as
1190 1190 parameters.
1191 1191 - digest:<digest-type>: the hexadecimal representation of the digest with
1192 1192 that name. Like the size, it is used to validate what was retrieved by
1193 1193 the client matches what the server knows about the bundle.
1194 1194
1195 1195 When multiple digest types are given, all of them are checked.
1196 1196 """
1197 1197 try:
1198 1198 raw_url = inpart.params['url']
1199 1199 except KeyError:
1200 1200 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1201 1201 parsed_url = util.url(raw_url)
1202 1202 if parsed_url.scheme not in capabilities['remote-changegroup']:
1203 1203 raise util.Abort(_('remote-changegroup does not support %s urls') %
1204 1204 parsed_url.scheme)
1205 1205
1206 1206 try:
1207 1207 size = int(inpart.params['size'])
1208 1208 except ValueError:
1209 1209 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1210 1210 % 'size')
1211 1211 except KeyError:
1212 1212 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1213 1213
1214 1214 digests = {}
1215 1215 for typ in inpart.params.get('digests', '').split():
1216 1216 param = 'digest:%s' % typ
1217 1217 try:
1218 1218 value = inpart.params[param]
1219 1219 except KeyError:
1220 1220 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1221 1221 param)
1222 1222 digests[typ] = value
1223 1223
1224 1224 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1225 1225
1226 1226 # Make sure we trigger a transaction creation
1227 1227 #
1228 1228 # The addchangegroup function will get a transaction object by itself, but
1229 1229 # we need to make sure we trigger the creation of a transaction object used
1230 1230 # for the whole processing scope.
1231 1231 op.gettransaction()
1232 1232 import exchange
1233 1233 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1234 1234 if not isinstance(cg, changegroup.cg1unpacker):
1235 1235 raise util.Abort(_('%s: not a bundle version 1.0') %
1236 1236 util.hidepassword(raw_url))
1237 1237 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1238 1238 op.records.add('changegroup', {'return': ret})
1239 1239 if op.reply is not None:
1240 1240 # This is definitely not the final form of this
1241 1241 # return. But one need to start somewhere.
1242 1242 part = op.reply.newpart('reply:changegroup')
1243 1243 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1244 1244 part.addparam('return', '%i' % ret, mandatory=False)
1245 1245 try:
1246 1246 real_part.validate()
1247 1247 except util.Abort, e:
1248 1248 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1249 1249 (util.hidepassword(raw_url), str(e)))
1250 1250 assert not inpart.read()
1251 1251
1252 1252 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1253 1253 def handlereplychangegroup(op, inpart):
1254 1254 ret = int(inpart.params['return'])
1255 1255 replyto = int(inpart.params['in-reply-to'])
1256 1256 op.records.add('changegroup', {'return': ret}, replyto)
1257 1257
1258 1258 @parthandler('check:heads')
1259 1259 def handlecheckheads(op, inpart):
1260 1260 """check that head of the repo did not change
1261 1261
1262 1262 This is used to detect a push race when using unbundle.
1263 1263 This replaces the "heads" argument of unbundle."""
1264 1264 h = inpart.read(20)
1265 1265 heads = []
1266 1266 while len(h) == 20:
1267 1267 heads.append(h)
1268 1268 h = inpart.read(20)
1269 1269 assert not h
1270 1270 if heads != op.repo.heads():
1271 1271 raise error.PushRaced('repository changed while pushing - '
1272 1272 'please try again')
1273 1273
1274 1274 @parthandler('output')
1275 1275 def handleoutput(op, inpart):
1276 1276 """forward output captured on the server to the client"""
1277 1277 for line in inpart.read().splitlines():
1278 1278 op.ui.status(('remote: %s\n' % line))
1279 1279
1280 1280 @parthandler('replycaps')
1281 1281 def handlereplycaps(op, inpart):
1282 1282 """Notify that a reply bundle should be created
1283 1283
1284 1284 The payload contains the capabilities information for the reply"""
1285 1285 caps = decodecaps(inpart.read())
1286 1286 if op.reply is None:
1287 1287 op.reply = bundle20(op.ui, caps)
1288 1288
1289 1289 @parthandler('error:abort', ('message', 'hint'))
1290 1290 def handleerrorabort(op, inpart):
1291 1291 """Used to transmit abort error over the wire"""
1292 1292 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1293 1293
1294 1294 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1295 1295 'in-reply-to'))
1296 1296 def handleerrorpushkey(op, inpart):
1297 1297 """Used to transmit failure of a mandatory pushkey over the wire"""
1298 1298 kwargs = {}
1299 1299 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1300 1300 value = inpart.params.get(name)
1301 1301 if value is not None:
1302 1302 kwargs[name] = value
1303 1303 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1304 1304
1305 1305 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1306 1306 def handleerrorunsupportedcontent(op, inpart):
1307 1307 """Used to transmit unknown content error over the wire"""
1308 1308 kwargs = {}
1309 1309 parttype = inpart.params.get('parttype')
1310 1310 if parttype is not None:
1311 1311 kwargs['parttype'] = parttype
1312 1312 params = inpart.params.get('params')
1313 1313 if params is not None:
1314 1314 kwargs['params'] = params.split('\0')
1315 1315
1316 1316 raise error.UnsupportedPartError(**kwargs)
1317 1317
1318 1318 @parthandler('error:pushraced', ('message',))
1319 1319 def handleerrorpushraced(op, inpart):
1320 1320 """Used to transmit push race error over the wire"""
1321 1321 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1322 1322
1323 1323 @parthandler('listkeys', ('namespace',))
1324 1324 def handlelistkeys(op, inpart):
1325 1325 """retrieve pushkey namespace content stored in a bundle2"""
1326 1326 namespace = inpart.params['namespace']
1327 1327 r = pushkey.decodekeys(inpart.read())
1328 1328 op.records.add('listkeys', (namespace, r))
1329 1329
1330 1330 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1331 1331 def handlepushkey(op, inpart):
1332 1332 """process a pushkey request"""
1333 1333 dec = pushkey.decode
1334 1334 namespace = dec(inpart.params['namespace'])
1335 1335 key = dec(inpart.params['key'])
1336 1336 old = dec(inpart.params['old'])
1337 1337 new = dec(inpart.params['new'])
1338 1338 ret = op.repo.pushkey(namespace, key, old, new)
1339 1339 record = {'namespace': namespace,
1340 1340 'key': key,
1341 1341 'old': old,
1342 1342 'new': new}
1343 1343 op.records.add('pushkey', record)
1344 1344 if op.reply is not None:
1345 1345 rpart = op.reply.newpart('reply:pushkey')
1346 1346 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1347 1347 rpart.addparam('return', '%i' % ret, mandatory=False)
1348 1348 if inpart.mandatory and not ret:
1349 1349 kwargs = {}
1350 1350 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1351 1351 if key in inpart.params:
1352 1352 kwargs[key] = inpart.params[key]
1353 1353 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1354 1354
1355 1355 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1356 1356 def handlepushkeyreply(op, inpart):
1357 1357 """retrieve the result of a pushkey request"""
1358 1358 ret = int(inpart.params['return'])
1359 1359 partid = int(inpart.params['in-reply-to'])
1360 1360 op.records.add('pushkey', {'return': ret}, partid)
1361 1361
1362 1362 @parthandler('obsmarkers')
1363 1363 def handleobsmarker(op, inpart):
1364 1364 """add a stream of obsmarkers to the repo"""
1365 1365 tr = op.gettransaction()
1366 1366 markerdata = inpart.read()
1367 1367 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1368 1368 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1369 1369 % len(markerdata))
1370 1370 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1371 1371 if new:
1372 1372 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1373 1373 op.records.add('obsmarkers', {'new': new})
1374 1374 if op.reply is not None:
1375 1375 rpart = op.reply.newpart('reply:obsmarkers')
1376 1376 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1377 1377 rpart.addparam('new', '%i' % new, mandatory=False)
1378 1378
1379 1379
1380 1380 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1381 1381 def handleobsmarkerreply(op, inpart):
1382 1382 """retrieve the result of a pushkey request"""
1383 1383 ret = int(inpart.params['new'])
1384 1384 partid = int(inpart.params['in-reply-to'])
1385 1385 op.records.add('obsmarkers', {'new': ret}, partid)
1386 1386
1387 1387 @parthandler('hgtagsfnodes')
1388 1388 def handlehgtagsfnodes(op, inpart):
1389 1389 """Applies .hgtags fnodes cache entries to the local repo.
1390 1390
1391 1391 Payload is pairs of 20 byte changeset nodes and filenodes.
1392 1392 """
1393 1393 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1394 1394
1395 1395 count = 0
1396 1396 while True:
1397 1397 node = inpart.read(20)
1398 1398 fnode = inpart.read(20)
1399 1399 if len(node) < 20 or len(fnode) < 20:
1400 1400 op.ui.debug('received incomplete .hgtags fnodes data, ignoring\n')
1401 1401 break
1402 1402 cache.setfnode(node, fnode)
1403 1403 count += 1
1404 1404
1405 1405 cache.write()
1406 1406 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now