##// END OF EJS Templates
bundle2: add generic debug output regarding generated interruption...
Pierre-Yves David -
r25324:deed9c6b default
parent child Browse files
Show More
@@ -1,1316 +1,1318 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import errno
149 149 import sys
150 150 import util
151 151 import struct
152 152 import urllib
153 153 import string
154 154 import obsolete
155 155 import pushkey
156 156 import url
157 157 import re
158 158
159 159 import changegroup, error
160 160 from i18n import _
161 161
162 162 _pack = struct.pack
163 163 _unpack = struct.unpack
164 164
165 165 _fstreamparamsize = '>i'
166 166 _fpartheadersize = '>i'
167 167 _fparttypesize = '>B'
168 168 _fpartid = '>I'
169 169 _fpayloadsize = '>i'
170 170 _fpartparamcount = '>BB'
171 171
172 172 preferedchunksize = 4096
173 173
174 174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175 175
176 176 def outdebug(ui, message):
177 177 """debug regarding output stream (bundling)"""
178 178 ui.debug('bundle2-output: %s\n' % message)
179 179
180 180 def indebug(ui, message):
181 181 """debug on input stream (unbundling)"""
182 182 ui.debug('bundle2-input: %s\n' % message)
183 183
184 184 def validateparttype(parttype):
185 185 """raise ValueError if a parttype contains invalid character"""
186 186 if _parttypeforbidden.search(parttype):
187 187 raise ValueError(parttype)
188 188
189 189 def _makefpartparamsizes(nbparams):
190 190 """return a struct format to read part parameter sizes
191 191
192 192 The number parameters is variable so we need to build that format
193 193 dynamically.
194 194 """
195 195 return '>'+('BB'*nbparams)
196 196
197 197 parthandlermapping = {}
198 198
199 199 def parthandler(parttype, params=()):
200 200 """decorator that register a function as a bundle2 part handler
201 201
202 202 eg::
203 203
204 204 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
205 205 def myparttypehandler(...):
206 206 '''process a part of type "my part".'''
207 207 ...
208 208 """
209 209 validateparttype(parttype)
210 210 def _decorator(func):
211 211 lparttype = parttype.lower() # enforce lower case matching.
212 212 assert lparttype not in parthandlermapping
213 213 parthandlermapping[lparttype] = func
214 214 func.params = frozenset(params)
215 215 return func
216 216 return _decorator
217 217
218 218 class unbundlerecords(object):
219 219 """keep record of what happens during and unbundle
220 220
221 221 New records are added using `records.add('cat', obj)`. Where 'cat' is a
222 222 category of record and obj is an arbitrary object.
223 223
224 224 `records['cat']` will return all entries of this category 'cat'.
225 225
226 226 Iterating on the object itself will yield `('category', obj)` tuples
227 227 for all entries.
228 228
229 229 All iterations happens in chronological order.
230 230 """
231 231
232 232 def __init__(self):
233 233 self._categories = {}
234 234 self._sequences = []
235 235 self._replies = {}
236 236
237 237 def add(self, category, entry, inreplyto=None):
238 238 """add a new record of a given category.
239 239
240 240 The entry can then be retrieved in the list returned by
241 241 self['category']."""
242 242 self._categories.setdefault(category, []).append(entry)
243 243 self._sequences.append((category, entry))
244 244 if inreplyto is not None:
245 245 self.getreplies(inreplyto).add(category, entry)
246 246
247 247 def getreplies(self, partid):
248 248 """get the records that are replies to a specific part"""
249 249 return self._replies.setdefault(partid, unbundlerecords())
250 250
251 251 def __getitem__(self, cat):
252 252 return tuple(self._categories.get(cat, ()))
253 253
254 254 def __iter__(self):
255 255 return iter(self._sequences)
256 256
257 257 def __len__(self):
258 258 return len(self._sequences)
259 259
260 260 def __nonzero__(self):
261 261 return bool(self._sequences)
262 262
263 263 class bundleoperation(object):
264 264 """an object that represents a single bundling process
265 265
266 266 Its purpose is to carry unbundle-related objects and states.
267 267
268 268 A new object should be created at the beginning of each bundle processing.
269 269 The object is to be returned by the processing function.
270 270
271 271 The object has very little content now it will ultimately contain:
272 272 * an access to the repo the bundle is applied to,
273 273 * a ui object,
274 274 * a way to retrieve a transaction to add changes to the repo,
275 275 * a way to record the result of processing each part,
276 276 * a way to construct a bundle response when applicable.
277 277 """
278 278
279 279 def __init__(self, repo, transactiongetter, captureoutput=True):
280 280 self.repo = repo
281 281 self.ui = repo.ui
282 282 self.records = unbundlerecords()
283 283 self.gettransaction = transactiongetter
284 284 self.reply = None
285 285 self.captureoutput = captureoutput
286 286
287 287 class TransactionUnavailable(RuntimeError):
288 288 pass
289 289
290 290 def _notransaction():
291 291 """default method to get a transaction while processing a bundle
292 292
293 293 Raise an exception to highlight the fact that no transaction was expected
294 294 to be created"""
295 295 raise TransactionUnavailable()
296 296
297 297 def processbundle(repo, unbundler, transactiongetter=None, op=None):
298 298 """This function process a bundle, apply effect to/from a repo
299 299
300 300 It iterates over each part then searches for and uses the proper handling
301 301 code to process the part. Parts are processed in order.
302 302
303 303 This is very early version of this function that will be strongly reworked
304 304 before final usage.
305 305
306 306 Unknown Mandatory part will abort the process.
307 307
308 308 It is temporarily possible to provide a prebuilt bundleoperation to the
309 309 function. This is used to ensure output is properly propagated in case of
310 310 an error during the unbundling. This output capturing part will likely be
311 311 reworked and this ability will probably go away in the process.
312 312 """
313 313 if op is None:
314 314 if transactiongetter is None:
315 315 transactiongetter = _notransaction
316 316 op = bundleoperation(repo, transactiongetter)
317 317 # todo:
318 318 # - replace this is a init function soon.
319 319 # - exception catching
320 320 unbundler.params
321 321 iterparts = unbundler.iterparts()
322 322 part = None
323 323 try:
324 324 for part in iterparts:
325 325 _processpart(op, part)
326 326 except BaseException, exc:
327 327 for part in iterparts:
328 328 # consume the bundle content
329 329 part.seek(0, 2)
330 330 # Small hack to let caller code distinguish exceptions from bundle2
331 331 # processing from processing the old format. This is mostly
332 332 # needed to handle different return codes to unbundle according to the
333 333 # type of bundle. We should probably clean up or drop this return code
334 334 # craziness in a future version.
335 335 exc.duringunbundle2 = True
336 336 salvaged = []
337 337 if op.reply is not None:
338 338 salvaged = op.reply.salvageoutput()
339 339 exc._bundle2salvagedoutput = salvaged
340 340 raise
341 341 return op
342 342
343 343 def _processpart(op, part):
344 344 """process a single part from a bundle
345 345
346 346 The part is guaranteed to have been fully consumed when the function exits
347 347 (even if an exception is raised)."""
348 348 try:
349 349 try:
350 350 handler = parthandlermapping.get(part.type)
351 351 if handler is None:
352 352 raise error.UnsupportedPartError(parttype=part.type)
353 353 indebug(op.ui, 'found a handler for part %r' % part.type)
354 354 unknownparams = part.mandatorykeys - handler.params
355 355 if unknownparams:
356 356 unknownparams = list(unknownparams)
357 357 unknownparams.sort()
358 358 raise error.UnsupportedPartError(parttype=part.type,
359 359 params=unknownparams)
360 360 except error.UnsupportedPartError, exc:
361 361 if part.mandatory: # mandatory parts
362 362 raise
363 363 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
364 364 return # skip to part processing
365 365
366 366 # handler is called outside the above try block so that we don't
367 367 # risk catching KeyErrors from anything other than the
368 368 # parthandlermapping lookup (any KeyError raised by handler()
369 369 # itself represents a defect of a different variety).
370 370 output = None
371 371 if op.captureoutput and op.reply is not None:
372 372 op.ui.pushbuffer(error=True, subproc=True)
373 373 output = ''
374 374 try:
375 375 handler(op, part)
376 376 finally:
377 377 if output is not None:
378 378 output = op.ui.popbuffer()
379 379 if output:
380 380 outpart = op.reply.newpart('output', data=output,
381 381 mandatory=False)
382 382 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
383 383 finally:
384 384 # consume the part content to not corrupt the stream.
385 385 part.seek(0, 2)
386 386
387 387
388 388 def decodecaps(blob):
389 389 """decode a bundle2 caps bytes blob into a dictionary
390 390
391 391 The blob is a list of capabilities (one per line)
392 392 Capabilities may have values using a line of the form::
393 393
394 394 capability=value1,value2,value3
395 395
396 396 The values are always a list."""
397 397 caps = {}
398 398 for line in blob.splitlines():
399 399 if not line:
400 400 continue
401 401 if '=' not in line:
402 402 key, vals = line, ()
403 403 else:
404 404 key, vals = line.split('=', 1)
405 405 vals = vals.split(',')
406 406 key = urllib.unquote(key)
407 407 vals = [urllib.unquote(v) for v in vals]
408 408 caps[key] = vals
409 409 return caps
410 410
411 411 def encodecaps(caps):
412 412 """encode a bundle2 caps dictionary into a bytes blob"""
413 413 chunks = []
414 414 for ca in sorted(caps):
415 415 vals = caps[ca]
416 416 ca = urllib.quote(ca)
417 417 vals = [urllib.quote(v) for v in vals]
418 418 if vals:
419 419 ca = "%s=%s" % (ca, ','.join(vals))
420 420 chunks.append(ca)
421 421 return '\n'.join(chunks)
422 422
423 423 class bundle20(object):
424 424 """represent an outgoing bundle2 container
425 425
426 426 Use the `addparam` method to add stream level parameter. and `newpart` to
427 427 populate it. Then call `getchunks` to retrieve all the binary chunks of
428 428 data that compose the bundle2 container."""
429 429
430 430 _magicstring = 'HG20'
431 431
432 432 def __init__(self, ui, capabilities=()):
433 433 self.ui = ui
434 434 self._params = []
435 435 self._parts = []
436 436 self.capabilities = dict(capabilities)
437 437
438 438 @property
439 439 def nbparts(self):
440 440 """total number of parts added to the bundler"""
441 441 return len(self._parts)
442 442
443 443 # methods used to defines the bundle2 content
444 444 def addparam(self, name, value=None):
445 445 """add a stream level parameter"""
446 446 if not name:
447 447 raise ValueError('empty parameter name')
448 448 if name[0] not in string.letters:
449 449 raise ValueError('non letter first character: %r' % name)
450 450 self._params.append((name, value))
451 451
452 452 def addpart(self, part):
453 453 """add a new part to the bundle2 container
454 454
455 455 Parts contains the actual applicative payload."""
456 456 assert part.id is None
457 457 part.id = len(self._parts) # very cheap counter
458 458 self._parts.append(part)
459 459
460 460 def newpart(self, typeid, *args, **kwargs):
461 461 """create a new part and add it to the containers
462 462
463 463 As the part is directly added to the containers. For now, this means
464 464 that any failure to properly initialize the part after calling
465 465 ``newpart`` should result in a failure of the whole bundling process.
466 466
467 467 You can still fall back to manually create and add if you need better
468 468 control."""
469 469 part = bundlepart(typeid, *args, **kwargs)
470 470 self.addpart(part)
471 471 return part
472 472
473 473 # methods used to generate the bundle2 stream
474 474 def getchunks(self):
475 475 if self.ui.debugflag:
476 476 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
477 477 if self._params:
478 478 msg.append(' (%i params)' % len(self._params))
479 479 msg.append(' %i parts total\n' % len(self._parts))
480 480 self.ui.debug(''.join(msg))
481 481 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
482 482 yield self._magicstring
483 483 param = self._paramchunk()
484 484 outdebug(self.ui, 'bundle parameter: %s' % param)
485 485 yield _pack(_fstreamparamsize, len(param))
486 486 if param:
487 487 yield param
488 488
489 489 outdebug(self.ui, 'start of parts')
490 490 for part in self._parts:
491 491 outdebug(self.ui, 'bundle part: "%s"' % part.type)
492 492 for chunk in part.getchunks(ui=self.ui):
493 493 yield chunk
494 494 outdebug(self.ui, 'end of bundle')
495 495 yield _pack(_fpartheadersize, 0)
496 496
497 497 def _paramchunk(self):
498 498 """return a encoded version of all stream parameters"""
499 499 blocks = []
500 500 for par, value in self._params:
501 501 par = urllib.quote(par)
502 502 if value is not None:
503 503 value = urllib.quote(value)
504 504 par = '%s=%s' % (par, value)
505 505 blocks.append(par)
506 506 return ' '.join(blocks)
507 507
508 508 def salvageoutput(self):
509 509 """return a list with a copy of all output parts in the bundle
510 510
511 511 This is meant to be used during error handling to make sure we preserve
512 512 server output"""
513 513 salvaged = []
514 514 for part in self._parts:
515 515 if part.type.startswith('output'):
516 516 salvaged.append(part.copy())
517 517 return salvaged
518 518
519 519
520 520 class unpackermixin(object):
521 521 """A mixin to extract bytes and struct data from a stream"""
522 522
523 523 def __init__(self, fp):
524 524 self._fp = fp
525 525 self._seekable = (util.safehasattr(fp, 'seek') and
526 526 util.safehasattr(fp, 'tell'))
527 527
528 528 def _unpack(self, format):
529 529 """unpack this struct format from the stream"""
530 530 data = self._readexact(struct.calcsize(format))
531 531 return _unpack(format, data)
532 532
533 533 def _readexact(self, size):
534 534 """read exactly <size> bytes from the stream"""
535 535 return changegroup.readexactly(self._fp, size)
536 536
537 537 def seek(self, offset, whence=0):
538 538 """move the underlying file pointer"""
539 539 if self._seekable:
540 540 return self._fp.seek(offset, whence)
541 541 else:
542 542 raise NotImplementedError(_('File pointer is not seekable'))
543 543
544 544 def tell(self):
545 545 """return the file offset, or None if file is not seekable"""
546 546 if self._seekable:
547 547 try:
548 548 return self._fp.tell()
549 549 except IOError, e:
550 550 if e.errno == errno.ESPIPE:
551 551 self._seekable = False
552 552 else:
553 553 raise
554 554 return None
555 555
556 556 def close(self):
557 557 """close underlying file"""
558 558 if util.safehasattr(self._fp, 'close'):
559 559 return self._fp.close()
560 560
561 561 def getunbundler(ui, fp, header=None):
562 562 """return a valid unbundler object for a given header"""
563 563 if header is None:
564 564 header = changegroup.readexactly(fp, 4)
565 565 magic, version = header[0:2], header[2:4]
566 566 if magic != 'HG':
567 567 raise util.Abort(_('not a Mercurial bundle'))
568 568 unbundlerclass = formatmap.get(version)
569 569 if unbundlerclass is None:
570 570 raise util.Abort(_('unknown bundle version %s') % version)
571 571 unbundler = unbundlerclass(ui, fp)
572 572 indebug(ui, 'start processing of %s stream' % header)
573 573 return unbundler
574 574
575 575 class unbundle20(unpackermixin):
576 576 """interpret a bundle2 stream
577 577
578 578 This class is fed with a binary stream and yields parts through its
579 579 `iterparts` methods."""
580 580
581 581 def __init__(self, ui, fp):
582 582 """If header is specified, we do not read it out of the stream."""
583 583 self.ui = ui
584 584 super(unbundle20, self).__init__(fp)
585 585
586 586 @util.propertycache
587 587 def params(self):
588 588 """dictionary of stream level parameters"""
589 589 indebug(self.ui, 'reading bundle2 stream parameters')
590 590 params = {}
591 591 paramssize = self._unpack(_fstreamparamsize)[0]
592 592 if paramssize < 0:
593 593 raise error.BundleValueError('negative bundle param size: %i'
594 594 % paramssize)
595 595 if paramssize:
596 596 for p in self._readexact(paramssize).split(' '):
597 597 p = p.split('=', 1)
598 598 p = [urllib.unquote(i) for i in p]
599 599 if len(p) < 2:
600 600 p.append(None)
601 601 self._processparam(*p)
602 602 params[p[0]] = p[1]
603 603 return params
604 604
605 605 def _processparam(self, name, value):
606 606 """process a parameter, applying its effect if needed
607 607
608 608 Parameter starting with a lower case letter are advisory and will be
609 609 ignored when unknown. Those starting with an upper case letter are
610 610 mandatory and will this function will raise a KeyError when unknown.
611 611
612 612 Note: no option are currently supported. Any input will be either
613 613 ignored or failing.
614 614 """
615 615 if not name:
616 616 raise ValueError('empty parameter name')
617 617 if name[0] not in string.letters:
618 618 raise ValueError('non letter first character: %r' % name)
619 619 # Some logic will be later added here to try to process the option for
620 620 # a dict of known parameter.
621 621 if name[0].islower():
622 622 indebug(self.ui, "ignoring unknown parameter %r" % name)
623 623 else:
624 624 raise error.UnsupportedPartError(params=(name,))
625 625
626 626
627 627 def iterparts(self):
628 628 """yield all parts contained in the stream"""
629 629 # make sure param have been loaded
630 630 self.params
631 631 indebug(self.ui, 'start extraction of bundle2 parts')
632 632 headerblock = self._readpartheader()
633 633 while headerblock is not None:
634 634 part = unbundlepart(self.ui, headerblock, self._fp)
635 635 yield part
636 636 part.seek(0, 2)
637 637 headerblock = self._readpartheader()
638 638 indebug(self.ui, 'end of bundle2 stream')
639 639
640 640 def _readpartheader(self):
641 641 """reads a part header size and return the bytes blob
642 642
643 643 returns None if empty"""
644 644 headersize = self._unpack(_fpartheadersize)[0]
645 645 if headersize < 0:
646 646 raise error.BundleValueError('negative part header size: %i'
647 647 % headersize)
648 648 indebug(self.ui, 'part header size: %i' % headersize)
649 649 if headersize:
650 650 return self._readexact(headersize)
651 651 return None
652 652
653 653 def compressed(self):
654 654 return False
655 655
656 656 formatmap = {'20': unbundle20}
657 657
658 658 class bundlepart(object):
659 659 """A bundle2 part contains application level payload
660 660
661 661 The part `type` is used to route the part to the application level
662 662 handler.
663 663
664 664 The part payload is contained in ``part.data``. It could be raw bytes or a
665 665 generator of byte chunks.
666 666
667 667 You can add parameters to the part using the ``addparam`` method.
668 668 Parameters can be either mandatory (default) or advisory. Remote side
669 669 should be able to safely ignore the advisory ones.
670 670
671 671 Both data and parameters cannot be modified after the generation has begun.
672 672 """
673 673
674 674 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
675 675 data='', mandatory=True):
676 676 validateparttype(parttype)
677 677 self.id = None
678 678 self.type = parttype
679 679 self._data = data
680 680 self._mandatoryparams = list(mandatoryparams)
681 681 self._advisoryparams = list(advisoryparams)
682 682 # checking for duplicated entries
683 683 self._seenparams = set()
684 684 for pname, __ in self._mandatoryparams + self._advisoryparams:
685 685 if pname in self._seenparams:
686 686 raise RuntimeError('duplicated params: %s' % pname)
687 687 self._seenparams.add(pname)
688 688 # status of the part's generation:
689 689 # - None: not started,
690 690 # - False: currently generated,
691 691 # - True: generation done.
692 692 self._generated = None
693 693 self.mandatory = mandatory
694 694
695 695 def copy(self):
696 696 """return a copy of the part
697 697
698 698 The new part have the very same content but no partid assigned yet.
699 699 Parts with generated data cannot be copied."""
700 700 assert not util.safehasattr(self.data, 'next')
701 701 return self.__class__(self.type, self._mandatoryparams,
702 702 self._advisoryparams, self._data, self.mandatory)
703 703
704 704 # methods used to defines the part content
705 705 def __setdata(self, data):
706 706 if self._generated is not None:
707 707 raise error.ReadOnlyPartError('part is being generated')
708 708 self._data = data
709 709 def __getdata(self):
710 710 return self._data
711 711 data = property(__getdata, __setdata)
712 712
713 713 @property
714 714 def mandatoryparams(self):
715 715 # make it an immutable tuple to force people through ``addparam``
716 716 return tuple(self._mandatoryparams)
717 717
718 718 @property
719 719 def advisoryparams(self):
720 720 # make it an immutable tuple to force people through ``addparam``
721 721 return tuple(self._advisoryparams)
722 722
723 723 def addparam(self, name, value='', mandatory=True):
724 724 if self._generated is not None:
725 725 raise error.ReadOnlyPartError('part is being generated')
726 726 if name in self._seenparams:
727 727 raise ValueError('duplicated params: %s' % name)
728 728 self._seenparams.add(name)
729 729 params = self._advisoryparams
730 730 if mandatory:
731 731 params = self._mandatoryparams
732 732 params.append((name, value))
733 733
734 734 # methods used to generates the bundle2 stream
735 735 def getchunks(self, ui):
736 736 if self._generated is not None:
737 737 raise RuntimeError('part can only be consumed once')
738 738 self._generated = False
739 739
740 740 if ui.debugflag:
741 741 msg = ['bundle2-output-part: "%s"' % self.type]
742 742 if not self.mandatory:
743 743 msg.append(' (advisory)')
744 744 nbmp = len(self.mandatoryparams)
745 745 nbap = len(self.advisoryparams)
746 746 if nbmp or nbap:
747 747 msg.append(' (params:')
748 748 if nbmp:
749 749 msg.append(' %i mandatory' % nbmp)
750 750 if nbap:
751 751 msg.append(' %i advisory' % nbmp)
752 752 msg.append(')')
753 753 if not self.data:
754 754 msg.append(' empty payload')
755 755 elif util.safehasattr(self.data, 'next'):
756 756 msg.append(' streamed payload')
757 757 else:
758 758 msg.append(' %i bytes payload' % len(self.data))
759 759 msg.append('\n')
760 760 ui.debug(''.join(msg))
761 761
762 762 #### header
763 763 if self.mandatory:
764 764 parttype = self.type.upper()
765 765 else:
766 766 parttype = self.type.lower()
767 767 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
768 768 ## parttype
769 769 header = [_pack(_fparttypesize, len(parttype)),
770 770 parttype, _pack(_fpartid, self.id),
771 771 ]
772 772 ## parameters
773 773 # count
774 774 manpar = self.mandatoryparams
775 775 advpar = self.advisoryparams
776 776 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
777 777 # size
778 778 parsizes = []
779 779 for key, value in manpar:
780 780 parsizes.append(len(key))
781 781 parsizes.append(len(value))
782 782 for key, value in advpar:
783 783 parsizes.append(len(key))
784 784 parsizes.append(len(value))
785 785 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
786 786 header.append(paramsizes)
787 787 # key, value
788 788 for key, value in manpar:
789 789 header.append(key)
790 790 header.append(value)
791 791 for key, value in advpar:
792 792 header.append(key)
793 793 header.append(value)
794 794 ## finalize header
795 795 headerchunk = ''.join(header)
796 796 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
797 797 yield _pack(_fpartheadersize, len(headerchunk))
798 798 yield headerchunk
799 799 ## payload
800 800 try:
801 801 for chunk in self._payloadchunks():
802 802 outdebug(ui, 'payload chunk size: %i' % len(chunk))
803 803 yield _pack(_fpayloadsize, len(chunk))
804 804 yield chunk
805 805 except BaseException, exc:
806 806 # backup exception data for later
807 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
808 % exc)
807 809 exc_info = sys.exc_info()
808 810 msg = 'unexpected error: %s' % exc
809 811 interpart = bundlepart('error:abort', [('message', msg)],
810 812 mandatory=False)
811 813 interpart.id = 0
812 814 yield _pack(_fpayloadsize, -1)
813 815 for chunk in interpart.getchunks(ui=ui):
814 816 yield chunk
815 817 outdebug(ui, 'closing payload chunk')
816 818 # abort current part payload
817 819 yield _pack(_fpayloadsize, 0)
818 820 raise exc_info[0], exc_info[1], exc_info[2]
819 821 # end of payload
820 822 outdebug(ui, 'closing payload chunk')
821 823 yield _pack(_fpayloadsize, 0)
822 824 self._generated = True
823 825
824 826 def _payloadchunks(self):
825 827 """yield chunks of a the part payload
826 828
827 829 Exists to handle the different methods to provide data to a part."""
828 830 # we only support fixed size data now.
829 831 # This will be improved in the future.
830 832 if util.safehasattr(self.data, 'next'):
831 833 buff = util.chunkbuffer(self.data)
832 834 chunk = buff.read(preferedchunksize)
833 835 while chunk:
834 836 yield chunk
835 837 chunk = buff.read(preferedchunksize)
836 838 elif len(self.data):
837 839 yield self.data
838 840
839 841
840 842 flaginterrupt = -1
841 843
842 844 class interrupthandler(unpackermixin):
843 845 """read one part and process it with restricted capability
844 846
845 847 This allows to transmit exception raised on the producer size during part
846 848 iteration while the consumer is reading a part.
847 849
848 850 Part processed in this manner only have access to a ui object,"""
849 851
850 852 def __init__(self, ui, fp):
851 853 super(interrupthandler, self).__init__(fp)
852 854 self.ui = ui
853 855
854 856 def _readpartheader(self):
855 857 """reads a part header size and return the bytes blob
856 858
857 859 returns None if empty"""
858 860 headersize = self._unpack(_fpartheadersize)[0]
859 861 if headersize < 0:
860 862 raise error.BundleValueError('negative part header size: %i'
861 863 % headersize)
862 864 indebug(self.ui, 'part header size: %i\n' % headersize)
863 865 if headersize:
864 866 return self._readexact(headersize)
865 867 return None
866 868
867 869 def __call__(self):
868 870 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
869 871 headerblock = self._readpartheader()
870 872 if headerblock is None:
871 873 indebug(self.ui, 'no part found during interruption.')
872 874 return
873 875 part = unbundlepart(self.ui, headerblock, self._fp)
874 876 op = interruptoperation(self.ui)
875 877 _processpart(op, part)
876 878
877 879 class interruptoperation(object):
878 880 """A limited operation to be use by part handler during interruption
879 881
880 882 It only have access to an ui object.
881 883 """
882 884
883 885 def __init__(self, ui):
884 886 self.ui = ui
885 887 self.reply = None
886 888 self.captureoutput = False
887 889
888 890 @property
889 891 def repo(self):
890 892 raise RuntimeError('no repo access from stream interruption')
891 893
892 894 def gettransaction(self):
893 895 raise TransactionUnavailable('no repo access from stream interruption')
894 896
895 897 class unbundlepart(unpackermixin):
896 898 """a bundle part read from a bundle"""
897 899
898 900 def __init__(self, ui, header, fp):
899 901 super(unbundlepart, self).__init__(fp)
900 902 self.ui = ui
901 903 # unbundle state attr
902 904 self._headerdata = header
903 905 self._headeroffset = 0
904 906 self._initialized = False
905 907 self.consumed = False
906 908 # part data
907 909 self.id = None
908 910 self.type = None
909 911 self.mandatoryparams = None
910 912 self.advisoryparams = None
911 913 self.params = None
912 914 self.mandatorykeys = ()
913 915 self._payloadstream = None
914 916 self._readheader()
915 917 self._mandatory = None
916 918 self._chunkindex = [] #(payload, file) position tuples for chunk starts
917 919 self._pos = 0
918 920
919 921 def _fromheader(self, size):
920 922 """return the next <size> byte from the header"""
921 923 offset = self._headeroffset
922 924 data = self._headerdata[offset:(offset + size)]
923 925 self._headeroffset = offset + size
924 926 return data
925 927
926 928 def _unpackheader(self, format):
927 929 """read given format from header
928 930
929 931 This automatically compute the size of the format to read."""
930 932 data = self._fromheader(struct.calcsize(format))
931 933 return _unpack(format, data)
932 934
933 935 def _initparams(self, mandatoryparams, advisoryparams):
934 936 """internal function to setup all logic related parameters"""
935 937 # make it read only to prevent people touching it by mistake.
936 938 self.mandatoryparams = tuple(mandatoryparams)
937 939 self.advisoryparams = tuple(advisoryparams)
938 940 # user friendly UI
939 941 self.params = dict(self.mandatoryparams)
940 942 self.params.update(dict(self.advisoryparams))
941 943 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
942 944
943 945 def _payloadchunks(self, chunknum=0):
944 946 '''seek to specified chunk and start yielding data'''
945 947 if len(self._chunkindex) == 0:
946 948 assert chunknum == 0, 'Must start with chunk 0'
947 949 self._chunkindex.append((0, super(unbundlepart, self).tell()))
948 950 else:
949 951 assert chunknum < len(self._chunkindex), \
950 952 'Unknown chunk %d' % chunknum
951 953 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
952 954
953 955 pos = self._chunkindex[chunknum][0]
954 956 payloadsize = self._unpack(_fpayloadsize)[0]
955 957 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
956 958 while payloadsize:
957 959 if payloadsize == flaginterrupt:
958 960 # interruption detection, the handler will now read a
959 961 # single part and process it.
960 962 interrupthandler(self.ui, self._fp)()
961 963 elif payloadsize < 0:
962 964 msg = 'negative payload chunk size: %i' % payloadsize
963 965 raise error.BundleValueError(msg)
964 966 else:
965 967 result = self._readexact(payloadsize)
966 968 chunknum += 1
967 969 pos += payloadsize
968 970 if chunknum == len(self._chunkindex):
969 971 self._chunkindex.append((pos,
970 972 super(unbundlepart, self).tell()))
971 973 yield result
972 974 payloadsize = self._unpack(_fpayloadsize)[0]
973 975 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
974 976
975 977 def _findchunk(self, pos):
976 978 '''for a given payload position, return a chunk number and offset'''
977 979 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
978 980 if ppos == pos:
979 981 return chunk, 0
980 982 elif ppos > pos:
981 983 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
982 984 raise ValueError('Unknown chunk')
983 985
984 986 def _readheader(self):
985 987 """read the header and setup the object"""
986 988 typesize = self._unpackheader(_fparttypesize)[0]
987 989 self.type = self._fromheader(typesize)
988 990 indebug(self.ui, 'part type: "%s"' % self.type)
989 991 self.id = self._unpackheader(_fpartid)[0]
990 992 indebug(self.ui, 'part id: "%s"' % self.id)
991 993 # extract mandatory bit from type
992 994 self.mandatory = (self.type != self.type.lower())
993 995 self.type = self.type.lower()
994 996 ## reading parameters
995 997 # param count
996 998 mancount, advcount = self._unpackheader(_fpartparamcount)
997 999 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
998 1000 # param size
999 1001 fparamsizes = _makefpartparamsizes(mancount + advcount)
1000 1002 paramsizes = self._unpackheader(fparamsizes)
1001 1003 # make it a list of couple again
1002 1004 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1003 1005 # split mandatory from advisory
1004 1006 mansizes = paramsizes[:mancount]
1005 1007 advsizes = paramsizes[mancount:]
1006 1008 # retrieve param value
1007 1009 manparams = []
1008 1010 for key, value in mansizes:
1009 1011 manparams.append((self._fromheader(key), self._fromheader(value)))
1010 1012 advparams = []
1011 1013 for key, value in advsizes:
1012 1014 advparams.append((self._fromheader(key), self._fromheader(value)))
1013 1015 self._initparams(manparams, advparams)
1014 1016 ## part payload
1015 1017 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1016 1018 # we read the data, tell it
1017 1019 self._initialized = True
1018 1020
1019 1021 def read(self, size=None):
1020 1022 """read payload data"""
1021 1023 if not self._initialized:
1022 1024 self._readheader()
1023 1025 if size is None:
1024 1026 data = self._payloadstream.read()
1025 1027 else:
1026 1028 data = self._payloadstream.read(size)
1027 1029 if size is None or len(data) < size:
1028 1030 self.consumed = True
1029 1031 self._pos += len(data)
1030 1032 return data
1031 1033
1032 1034 def tell(self):
1033 1035 return self._pos
1034 1036
1035 1037 def seek(self, offset, whence=0):
1036 1038 if whence == 0:
1037 1039 newpos = offset
1038 1040 elif whence == 1:
1039 1041 newpos = self._pos + offset
1040 1042 elif whence == 2:
1041 1043 if not self.consumed:
1042 1044 self.read()
1043 1045 newpos = self._chunkindex[-1][0] - offset
1044 1046 else:
1045 1047 raise ValueError('Unknown whence value: %r' % (whence,))
1046 1048
1047 1049 if newpos > self._chunkindex[-1][0] and not self.consumed:
1048 1050 self.read()
1049 1051 if not 0 <= newpos <= self._chunkindex[-1][0]:
1050 1052 raise ValueError('Offset out of range')
1051 1053
1052 1054 if self._pos != newpos:
1053 1055 chunk, internaloffset = self._findchunk(newpos)
1054 1056 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1055 1057 adjust = self.read(internaloffset)
1056 1058 if len(adjust) != internaloffset:
1057 1059 raise util.Abort(_('Seek failed\n'))
1058 1060 self._pos = newpos
1059 1061
1060 1062 # These are only the static capabilities.
1061 1063 # Check the 'getrepocaps' function for the rest.
1062 1064 capabilities = {'HG20': (),
1063 1065 'listkeys': (),
1064 1066 'pushkey': (),
1065 1067 'digests': tuple(sorted(util.DIGESTS.keys())),
1066 1068 'remote-changegroup': ('http', 'https'),
1067 1069 }
1068 1070
1069 1071 def getrepocaps(repo, allowpushback=False):
1070 1072 """return the bundle2 capabilities for a given repo
1071 1073
1072 1074 Exists to allow extensions (like evolution) to mutate the capabilities.
1073 1075 """
1074 1076 caps = capabilities.copy()
1075 1077 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1076 1078 if obsolete.isenabled(repo, obsolete.exchangeopt):
1077 1079 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1078 1080 caps['obsmarkers'] = supportedformat
1079 1081 if allowpushback:
1080 1082 caps['pushback'] = ()
1081 1083 return caps
1082 1084
1083 1085 def bundle2caps(remote):
1084 1086 """return the bundle capabilities of a peer as dict"""
1085 1087 raw = remote.capable('bundle2')
1086 1088 if not raw and raw != '':
1087 1089 return {}
1088 1090 capsblob = urllib.unquote(remote.capable('bundle2'))
1089 1091 return decodecaps(capsblob)
1090 1092
1091 1093 def obsmarkersversion(caps):
1092 1094 """extract the list of supported obsmarkers versions from a bundle2caps dict
1093 1095 """
1094 1096 obscaps = caps.get('obsmarkers', ())
1095 1097 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1096 1098
1097 1099 @parthandler('changegroup', ('version',))
1098 1100 def handlechangegroup(op, inpart):
1099 1101 """apply a changegroup part on the repo
1100 1102
1101 1103 This is a very early implementation that will massive rework before being
1102 1104 inflicted to any end-user.
1103 1105 """
1104 1106 # Make sure we trigger a transaction creation
1105 1107 #
1106 1108 # The addchangegroup function will get a transaction object by itself, but
1107 1109 # we need to make sure we trigger the creation of a transaction object used
1108 1110 # for the whole processing scope.
1109 1111 op.gettransaction()
1110 1112 unpackerversion = inpart.params.get('version', '01')
1111 1113 # We should raise an appropriate exception here
1112 1114 unpacker = changegroup.packermap[unpackerversion][1]
1113 1115 cg = unpacker(inpart, 'UN')
1114 1116 # the source and url passed here are overwritten by the one contained in
1115 1117 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1116 1118 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1117 1119 op.records.add('changegroup', {'return': ret})
1118 1120 if op.reply is not None:
1119 1121 # This is definitely not the final form of this
1120 1122 # return. But one need to start somewhere.
1121 1123 part = op.reply.newpart('reply:changegroup', mandatory=False)
1122 1124 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1123 1125 part.addparam('return', '%i' % ret, mandatory=False)
1124 1126 assert not inpart.read()
1125 1127
1126 1128 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1127 1129 ['digest:%s' % k for k in util.DIGESTS.keys()])
1128 1130 @parthandler('remote-changegroup', _remotechangegroupparams)
1129 1131 def handleremotechangegroup(op, inpart):
1130 1132 """apply a bundle10 on the repo, given an url and validation information
1131 1133
1132 1134 All the information about the remote bundle to import are given as
1133 1135 parameters. The parameters include:
1134 1136 - url: the url to the bundle10.
1135 1137 - size: the bundle10 file size. It is used to validate what was
1136 1138 retrieved by the client matches the server knowledge about the bundle.
1137 1139 - digests: a space separated list of the digest types provided as
1138 1140 parameters.
1139 1141 - digest:<digest-type>: the hexadecimal representation of the digest with
1140 1142 that name. Like the size, it is used to validate what was retrieved by
1141 1143 the client matches what the server knows about the bundle.
1142 1144
1143 1145 When multiple digest types are given, all of them are checked.
1144 1146 """
1145 1147 try:
1146 1148 raw_url = inpart.params['url']
1147 1149 except KeyError:
1148 1150 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1149 1151 parsed_url = util.url(raw_url)
1150 1152 if parsed_url.scheme not in capabilities['remote-changegroup']:
1151 1153 raise util.Abort(_('remote-changegroup does not support %s urls') %
1152 1154 parsed_url.scheme)
1153 1155
1154 1156 try:
1155 1157 size = int(inpart.params['size'])
1156 1158 except ValueError:
1157 1159 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1158 1160 % 'size')
1159 1161 except KeyError:
1160 1162 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1161 1163
1162 1164 digests = {}
1163 1165 for typ in inpart.params.get('digests', '').split():
1164 1166 param = 'digest:%s' % typ
1165 1167 try:
1166 1168 value = inpart.params[param]
1167 1169 except KeyError:
1168 1170 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1169 1171 param)
1170 1172 digests[typ] = value
1171 1173
1172 1174 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1173 1175
1174 1176 # Make sure we trigger a transaction creation
1175 1177 #
1176 1178 # The addchangegroup function will get a transaction object by itself, but
1177 1179 # we need to make sure we trigger the creation of a transaction object used
1178 1180 # for the whole processing scope.
1179 1181 op.gettransaction()
1180 1182 import exchange
1181 1183 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1182 1184 if not isinstance(cg, changegroup.cg1unpacker):
1183 1185 raise util.Abort(_('%s: not a bundle version 1.0') %
1184 1186 util.hidepassword(raw_url))
1185 1187 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1186 1188 op.records.add('changegroup', {'return': ret})
1187 1189 if op.reply is not None:
1188 1190 # This is definitely not the final form of this
1189 1191 # return. But one need to start somewhere.
1190 1192 part = op.reply.newpart('reply:changegroup')
1191 1193 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1192 1194 part.addparam('return', '%i' % ret, mandatory=False)
1193 1195 try:
1194 1196 real_part.validate()
1195 1197 except util.Abort, e:
1196 1198 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1197 1199 (util.hidepassword(raw_url), str(e)))
1198 1200 assert not inpart.read()
1199 1201
1200 1202 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1201 1203 def handlereplychangegroup(op, inpart):
1202 1204 ret = int(inpart.params['return'])
1203 1205 replyto = int(inpart.params['in-reply-to'])
1204 1206 op.records.add('changegroup', {'return': ret}, replyto)
1205 1207
1206 1208 @parthandler('check:heads')
1207 1209 def handlecheckheads(op, inpart):
1208 1210 """check that head of the repo did not change
1209 1211
1210 1212 This is used to detect a push race when using unbundle.
1211 1213 This replaces the "heads" argument of unbundle."""
1212 1214 h = inpart.read(20)
1213 1215 heads = []
1214 1216 while len(h) == 20:
1215 1217 heads.append(h)
1216 1218 h = inpart.read(20)
1217 1219 assert not h
1218 1220 if heads != op.repo.heads():
1219 1221 raise error.PushRaced('repository changed while pushing - '
1220 1222 'please try again')
1221 1223
1222 1224 @parthandler('output')
1223 1225 def handleoutput(op, inpart):
1224 1226 """forward output captured on the server to the client"""
1225 1227 for line in inpart.read().splitlines():
1226 1228 op.ui.status(('remote: %s\n' % line))
1227 1229
1228 1230 @parthandler('replycaps')
1229 1231 def handlereplycaps(op, inpart):
1230 1232 """Notify that a reply bundle should be created
1231 1233
1232 1234 The payload contains the capabilities information for the reply"""
1233 1235 caps = decodecaps(inpart.read())
1234 1236 if op.reply is None:
1235 1237 op.reply = bundle20(op.ui, caps)
1236 1238
1237 1239 @parthandler('error:abort', ('message', 'hint'))
1238 1240 def handleerrorabort(op, inpart):
1239 1241 """Used to transmit abort error over the wire"""
1240 1242 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1241 1243
1242 1244 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1243 1245 def handleerrorunsupportedcontent(op, inpart):
1244 1246 """Used to transmit unknown content error over the wire"""
1245 1247 kwargs = {}
1246 1248 parttype = inpart.params.get('parttype')
1247 1249 if parttype is not None:
1248 1250 kwargs['parttype'] = parttype
1249 1251 params = inpart.params.get('params')
1250 1252 if params is not None:
1251 1253 kwargs['params'] = params.split('\0')
1252 1254
1253 1255 raise error.UnsupportedPartError(**kwargs)
1254 1256
1255 1257 @parthandler('error:pushraced', ('message',))
1256 1258 def handleerrorpushraced(op, inpart):
1257 1259 """Used to transmit push race error over the wire"""
1258 1260 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1259 1261
1260 1262 @parthandler('listkeys', ('namespace',))
1261 1263 def handlelistkeys(op, inpart):
1262 1264 """retrieve pushkey namespace content stored in a bundle2"""
1263 1265 namespace = inpart.params['namespace']
1264 1266 r = pushkey.decodekeys(inpart.read())
1265 1267 op.records.add('listkeys', (namespace, r))
1266 1268
1267 1269 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1268 1270 def handlepushkey(op, inpart):
1269 1271 """process a pushkey request"""
1270 1272 dec = pushkey.decode
1271 1273 namespace = dec(inpart.params['namespace'])
1272 1274 key = dec(inpart.params['key'])
1273 1275 old = dec(inpart.params['old'])
1274 1276 new = dec(inpart.params['new'])
1275 1277 ret = op.repo.pushkey(namespace, key, old, new)
1276 1278 record = {'namespace': namespace,
1277 1279 'key': key,
1278 1280 'old': old,
1279 1281 'new': new}
1280 1282 op.records.add('pushkey', record)
1281 1283 if op.reply is not None:
1282 1284 rpart = op.reply.newpart('reply:pushkey')
1283 1285 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1284 1286 rpart.addparam('return', '%i' % ret, mandatory=False)
1285 1287
1286 1288 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1287 1289 def handlepushkeyreply(op, inpart):
1288 1290 """retrieve the result of a pushkey request"""
1289 1291 ret = int(inpart.params['return'])
1290 1292 partid = int(inpart.params['in-reply-to'])
1291 1293 op.records.add('pushkey', {'return': ret}, partid)
1292 1294
1293 1295 @parthandler('obsmarkers')
1294 1296 def handleobsmarker(op, inpart):
1295 1297 """add a stream of obsmarkers to the repo"""
1296 1298 tr = op.gettransaction()
1297 1299 markerdata = inpart.read()
1298 1300 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1299 1301 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1300 1302 % len(markerdata))
1301 1303 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1302 1304 if new:
1303 1305 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1304 1306 op.records.add('obsmarkers', {'new': new})
1305 1307 if op.reply is not None:
1306 1308 rpart = op.reply.newpart('reply:obsmarkers')
1307 1309 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1308 1310 rpart.addparam('new', '%i' % new, mandatory=False)
1309 1311
1310 1312
1311 1313 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1312 1314 def handlepushkeyreply(op, inpart):
1313 1315 """retrieve the result of a pushkey request"""
1314 1316 ret = int(inpart.params['new'])
1315 1317 partid = int(inpart.params['in-reply-to'])
1316 1318 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now