##// END OF EJS Templates
bundle2: introduce a specific function for bundling debug message...
Pierre-Yves David -
r25313:8f2c362b default
parent child Browse files
Show More
@@ -1,1272 +1,1276
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import errno
149 149 import sys
150 150 import util
151 151 import struct
152 152 import urllib
153 153 import string
154 154 import obsolete
155 155 import pushkey
156 156 import url
157 157 import re
158 158
159 159 import changegroup, error
160 160 from i18n import _
161 161
162 162 _pack = struct.pack
163 163 _unpack = struct.unpack
164 164
165 165 _fstreamparamsize = '>i'
166 166 _fpartheadersize = '>i'
167 167 _fparttypesize = '>B'
168 168 _fpartid = '>I'
169 169 _fpayloadsize = '>i'
170 170 _fpartparamcount = '>BB'
171 171
172 172 preferedchunksize = 4096
173 173
174 174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175 175
176 def outdebug(ui, message):
177 """debug regarding output stream (bundling)"""
178 ui.debug(message)
179
176 180 def validateparttype(parttype):
177 181 """raise ValueError if a parttype contains invalid character"""
178 182 if _parttypeforbidden.search(parttype):
179 183 raise ValueError(parttype)
180 184
181 185 def _makefpartparamsizes(nbparams):
182 186 """return a struct format to read part parameter sizes
183 187
184 188 The number parameters is variable so we need to build that format
185 189 dynamically.
186 190 """
187 191 return '>'+('BB'*nbparams)
188 192
189 193 parthandlermapping = {}
190 194
191 195 def parthandler(parttype, params=()):
192 196 """decorator that register a function as a bundle2 part handler
193 197
194 198 eg::
195 199
196 200 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
197 201 def myparttypehandler(...):
198 202 '''process a part of type "my part".'''
199 203 ...
200 204 """
201 205 validateparttype(parttype)
202 206 def _decorator(func):
203 207 lparttype = parttype.lower() # enforce lower case matching.
204 208 assert lparttype not in parthandlermapping
205 209 parthandlermapping[lparttype] = func
206 210 func.params = frozenset(params)
207 211 return func
208 212 return _decorator
209 213
210 214 class unbundlerecords(object):
211 215 """keep record of what happens during and unbundle
212 216
213 217 New records are added using `records.add('cat', obj)`. Where 'cat' is a
214 218 category of record and obj is an arbitrary object.
215 219
216 220 `records['cat']` will return all entries of this category 'cat'.
217 221
218 222 Iterating on the object itself will yield `('category', obj)` tuples
219 223 for all entries.
220 224
221 225 All iterations happens in chronological order.
222 226 """
223 227
224 228 def __init__(self):
225 229 self._categories = {}
226 230 self._sequences = []
227 231 self._replies = {}
228 232
229 233 def add(self, category, entry, inreplyto=None):
230 234 """add a new record of a given category.
231 235
232 236 The entry can then be retrieved in the list returned by
233 237 self['category']."""
234 238 self._categories.setdefault(category, []).append(entry)
235 239 self._sequences.append((category, entry))
236 240 if inreplyto is not None:
237 241 self.getreplies(inreplyto).add(category, entry)
238 242
239 243 def getreplies(self, partid):
240 244 """get the records that are replies to a specific part"""
241 245 return self._replies.setdefault(partid, unbundlerecords())
242 246
243 247 def __getitem__(self, cat):
244 248 return tuple(self._categories.get(cat, ()))
245 249
246 250 def __iter__(self):
247 251 return iter(self._sequences)
248 252
249 253 def __len__(self):
250 254 return len(self._sequences)
251 255
252 256 def __nonzero__(self):
253 257 return bool(self._sequences)
254 258
255 259 class bundleoperation(object):
256 260 """an object that represents a single bundling process
257 261
258 262 Its purpose is to carry unbundle-related objects and states.
259 263
260 264 A new object should be created at the beginning of each bundle processing.
261 265 The object is to be returned by the processing function.
262 266
263 267 The object has very little content now it will ultimately contain:
264 268 * an access to the repo the bundle is applied to,
265 269 * a ui object,
266 270 * a way to retrieve a transaction to add changes to the repo,
267 271 * a way to record the result of processing each part,
268 272 * a way to construct a bundle response when applicable.
269 273 """
270 274
271 275 def __init__(self, repo, transactiongetter, captureoutput=True):
272 276 self.repo = repo
273 277 self.ui = repo.ui
274 278 self.records = unbundlerecords()
275 279 self.gettransaction = transactiongetter
276 280 self.reply = None
277 281 self.captureoutput = captureoutput
278 282
279 283 class TransactionUnavailable(RuntimeError):
280 284 pass
281 285
282 286 def _notransaction():
283 287 """default method to get a transaction while processing a bundle
284 288
285 289 Raise an exception to highlight the fact that no transaction was expected
286 290 to be created"""
287 291 raise TransactionUnavailable()
288 292
289 293 def processbundle(repo, unbundler, transactiongetter=None, op=None):
290 294 """This function process a bundle, apply effect to/from a repo
291 295
292 296 It iterates over each part then searches for and uses the proper handling
293 297 code to process the part. Parts are processed in order.
294 298
295 299 This is very early version of this function that will be strongly reworked
296 300 before final usage.
297 301
298 302 Unknown Mandatory part will abort the process.
299 303
300 304 It is temporarily possible to provide a prebuilt bundleoperation to the
301 305 function. This is used to ensure output is properly propagated in case of
302 306 an error during the unbundling. This output capturing part will likely be
303 307 reworked and this ability will probably go away in the process.
304 308 """
305 309 if op is None:
306 310 if transactiongetter is None:
307 311 transactiongetter = _notransaction
308 312 op = bundleoperation(repo, transactiongetter)
309 313 # todo:
310 314 # - replace this is a init function soon.
311 315 # - exception catching
312 316 unbundler.params
313 317 iterparts = unbundler.iterparts()
314 318 part = None
315 319 try:
316 320 for part in iterparts:
317 321 _processpart(op, part)
318 322 except BaseException, exc:
319 323 for part in iterparts:
320 324 # consume the bundle content
321 325 part.seek(0, 2)
322 326 # Small hack to let caller code distinguish exceptions from bundle2
323 327 # processing from processing the old format. This is mostly
324 328 # needed to handle different return codes to unbundle according to the
325 329 # type of bundle. We should probably clean up or drop this return code
326 330 # craziness in a future version.
327 331 exc.duringunbundle2 = True
328 332 salvaged = []
329 333 if op.reply is not None:
330 334 salvaged = op.reply.salvageoutput()
331 335 exc._bundle2salvagedoutput = salvaged
332 336 raise
333 337 return op
334 338
335 339 def _processpart(op, part):
336 340 """process a single part from a bundle
337 341
338 342 The part is guaranteed to have been fully consumed when the function exits
339 343 (even if an exception is raised)."""
340 344 try:
341 345 try:
342 346 handler = parthandlermapping.get(part.type)
343 347 if handler is None:
344 348 raise error.UnsupportedPartError(parttype=part.type)
345 349 op.ui.debug('found a handler for part %r\n' % part.type)
346 350 unknownparams = part.mandatorykeys - handler.params
347 351 if unknownparams:
348 352 unknownparams = list(unknownparams)
349 353 unknownparams.sort()
350 354 raise error.UnsupportedPartError(parttype=part.type,
351 355 params=unknownparams)
352 356 except error.UnsupportedPartError, exc:
353 357 if part.mandatory: # mandatory parts
354 358 raise
355 359 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
356 360 return # skip to part processing
357 361
358 362 # handler is called outside the above try block so that we don't
359 363 # risk catching KeyErrors from anything other than the
360 364 # parthandlermapping lookup (any KeyError raised by handler()
361 365 # itself represents a defect of a different variety).
362 366 output = None
363 367 if op.captureoutput and op.reply is not None:
364 368 op.ui.pushbuffer(error=True, subproc=True)
365 369 output = ''
366 370 try:
367 371 handler(op, part)
368 372 finally:
369 373 if output is not None:
370 374 output = op.ui.popbuffer()
371 375 if output:
372 376 outpart = op.reply.newpart('output', data=output,
373 377 mandatory=False)
374 378 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
375 379 finally:
376 380 # consume the part content to not corrupt the stream.
377 381 part.seek(0, 2)
378 382
379 383
380 384 def decodecaps(blob):
381 385 """decode a bundle2 caps bytes blob into a dictionary
382 386
383 387 The blob is a list of capabilities (one per line)
384 388 Capabilities may have values using a line of the form::
385 389
386 390 capability=value1,value2,value3
387 391
388 392 The values are always a list."""
389 393 caps = {}
390 394 for line in blob.splitlines():
391 395 if not line:
392 396 continue
393 397 if '=' not in line:
394 398 key, vals = line, ()
395 399 else:
396 400 key, vals = line.split('=', 1)
397 401 vals = vals.split(',')
398 402 key = urllib.unquote(key)
399 403 vals = [urllib.unquote(v) for v in vals]
400 404 caps[key] = vals
401 405 return caps
402 406
403 407 def encodecaps(caps):
404 408 """encode a bundle2 caps dictionary into a bytes blob"""
405 409 chunks = []
406 410 for ca in sorted(caps):
407 411 vals = caps[ca]
408 412 ca = urllib.quote(ca)
409 413 vals = [urllib.quote(v) for v in vals]
410 414 if vals:
411 415 ca = "%s=%s" % (ca, ','.join(vals))
412 416 chunks.append(ca)
413 417 return '\n'.join(chunks)
414 418
415 419 class bundle20(object):
416 420 """represent an outgoing bundle2 container
417 421
418 422 Use the `addparam` method to add stream level parameter. and `newpart` to
419 423 populate it. Then call `getchunks` to retrieve all the binary chunks of
420 424 data that compose the bundle2 container."""
421 425
422 426 _magicstring = 'HG20'
423 427
424 428 def __init__(self, ui, capabilities=()):
425 429 self.ui = ui
426 430 self._params = []
427 431 self._parts = []
428 432 self.capabilities = dict(capabilities)
429 433
430 434 @property
431 435 def nbparts(self):
432 436 """total number of parts added to the bundler"""
433 437 return len(self._parts)
434 438
435 439 # methods used to defines the bundle2 content
436 440 def addparam(self, name, value=None):
437 441 """add a stream level parameter"""
438 442 if not name:
439 443 raise ValueError('empty parameter name')
440 444 if name[0] not in string.letters:
441 445 raise ValueError('non letter first character: %r' % name)
442 446 self._params.append((name, value))
443 447
444 448 def addpart(self, part):
445 449 """add a new part to the bundle2 container
446 450
447 451 Parts contains the actual applicative payload."""
448 452 assert part.id is None
449 453 part.id = len(self._parts) # very cheap counter
450 454 self._parts.append(part)
451 455
452 456 def newpart(self, typeid, *args, **kwargs):
453 457 """create a new part and add it to the containers
454 458
455 459 As the part is directly added to the containers. For now, this means
456 460 that any failure to properly initialize the part after calling
457 461 ``newpart`` should result in a failure of the whole bundling process.
458 462
459 463 You can still fall back to manually create and add if you need better
460 464 control."""
461 465 part = bundlepart(typeid, *args, **kwargs)
462 466 self.addpart(part)
463 467 return part
464 468
465 469 # methods used to generate the bundle2 stream
466 470 def getchunks(self):
467 self.ui.debug('start emission of %s stream\n' % self._magicstring)
471 outdebug(self.ui, 'start emission of %s stream\n' % self._magicstring)
468 472 yield self._magicstring
469 473 param = self._paramchunk()
470 self.ui.debug('bundle parameter: %s\n' % param)
474 outdebug(self.ui, 'bundle parameter: %s\n' % param)
471 475 yield _pack(_fstreamparamsize, len(param))
472 476 if param:
473 477 yield param
474 478
475 self.ui.debug('start of parts\n')
479 outdebug(self.ui, 'start of parts\n')
476 480 for part in self._parts:
477 self.ui.debug('bundle part: "%s"\n' % part.type)
481 outdebug(self.ui, 'bundle part: "%s"\n' % part.type)
478 482 for chunk in part.getchunks():
479 483 yield chunk
480 self.ui.debug('end of bundle\n')
484 outdebug(self.ui, 'end of bundle\n')
481 485 yield _pack(_fpartheadersize, 0)
482 486
483 487 def _paramchunk(self):
484 488 """return a encoded version of all stream parameters"""
485 489 blocks = []
486 490 for par, value in self._params:
487 491 par = urllib.quote(par)
488 492 if value is not None:
489 493 value = urllib.quote(value)
490 494 par = '%s=%s' % (par, value)
491 495 blocks.append(par)
492 496 return ' '.join(blocks)
493 497
494 498 def salvageoutput(self):
495 499 """return a list with a copy of all output parts in the bundle
496 500
497 501 This is meant to be used during error handling to make sure we preserve
498 502 server output"""
499 503 salvaged = []
500 504 for part in self._parts:
501 505 if part.type.startswith('output'):
502 506 salvaged.append(part.copy())
503 507 return salvaged
504 508
505 509
506 510 class unpackermixin(object):
507 511 """A mixin to extract bytes and struct data from a stream"""
508 512
509 513 def __init__(self, fp):
510 514 self._fp = fp
511 515 self._seekable = (util.safehasattr(fp, 'seek') and
512 516 util.safehasattr(fp, 'tell'))
513 517
514 518 def _unpack(self, format):
515 519 """unpack this struct format from the stream"""
516 520 data = self._readexact(struct.calcsize(format))
517 521 return _unpack(format, data)
518 522
519 523 def _readexact(self, size):
520 524 """read exactly <size> bytes from the stream"""
521 525 return changegroup.readexactly(self._fp, size)
522 526
523 527 def seek(self, offset, whence=0):
524 528 """move the underlying file pointer"""
525 529 if self._seekable:
526 530 return self._fp.seek(offset, whence)
527 531 else:
528 532 raise NotImplementedError(_('File pointer is not seekable'))
529 533
530 534 def tell(self):
531 535 """return the file offset, or None if file is not seekable"""
532 536 if self._seekable:
533 537 try:
534 538 return self._fp.tell()
535 539 except IOError, e:
536 540 if e.errno == errno.ESPIPE:
537 541 self._seekable = False
538 542 else:
539 543 raise
540 544 return None
541 545
542 546 def close(self):
543 547 """close underlying file"""
544 548 if util.safehasattr(self._fp, 'close'):
545 549 return self._fp.close()
546 550
547 551 def getunbundler(ui, fp, header=None):
548 552 """return a valid unbundler object for a given header"""
549 553 if header is None:
550 554 header = changegroup.readexactly(fp, 4)
551 555 magic, version = header[0:2], header[2:4]
552 556 if magic != 'HG':
553 557 raise util.Abort(_('not a Mercurial bundle'))
554 558 unbundlerclass = formatmap.get(version)
555 559 if unbundlerclass is None:
556 560 raise util.Abort(_('unknown bundle version %s') % version)
557 561 unbundler = unbundlerclass(ui, fp)
558 562 ui.debug('start processing of %s stream\n' % header)
559 563 return unbundler
560 564
561 565 class unbundle20(unpackermixin):
562 566 """interpret a bundle2 stream
563 567
564 568 This class is fed with a binary stream and yields parts through its
565 569 `iterparts` methods."""
566 570
567 571 def __init__(self, ui, fp):
568 572 """If header is specified, we do not read it out of the stream."""
569 573 self.ui = ui
570 574 super(unbundle20, self).__init__(fp)
571 575
572 576 @util.propertycache
573 577 def params(self):
574 578 """dictionary of stream level parameters"""
575 579 self.ui.debug('reading bundle2 stream parameters\n')
576 580 params = {}
577 581 paramssize = self._unpack(_fstreamparamsize)[0]
578 582 if paramssize < 0:
579 583 raise error.BundleValueError('negative bundle param size: %i'
580 584 % paramssize)
581 585 if paramssize:
582 586 for p in self._readexact(paramssize).split(' '):
583 587 p = p.split('=', 1)
584 588 p = [urllib.unquote(i) for i in p]
585 589 if len(p) < 2:
586 590 p.append(None)
587 591 self._processparam(*p)
588 592 params[p[0]] = p[1]
589 593 return params
590 594
591 595 def _processparam(self, name, value):
592 596 """process a parameter, applying its effect if needed
593 597
594 598 Parameter starting with a lower case letter are advisory and will be
595 599 ignored when unknown. Those starting with an upper case letter are
596 600 mandatory and will this function will raise a KeyError when unknown.
597 601
598 602 Note: no option are currently supported. Any input will be either
599 603 ignored or failing.
600 604 """
601 605 if not name:
602 606 raise ValueError('empty parameter name')
603 607 if name[0] not in string.letters:
604 608 raise ValueError('non letter first character: %r' % name)
605 609 # Some logic will be later added here to try to process the option for
606 610 # a dict of known parameter.
607 611 if name[0].islower():
608 612 self.ui.debug("ignoring unknown parameter %r\n" % name)
609 613 else:
610 614 raise error.UnsupportedPartError(params=(name,))
611 615
612 616
613 617 def iterparts(self):
614 618 """yield all parts contained in the stream"""
615 619 # make sure param have been loaded
616 620 self.params
617 621 self.ui.debug('start extraction of bundle2 parts\n')
618 622 headerblock = self._readpartheader()
619 623 while headerblock is not None:
620 624 part = unbundlepart(self.ui, headerblock, self._fp)
621 625 yield part
622 626 part.seek(0, 2)
623 627 headerblock = self._readpartheader()
624 628 self.ui.debug('end of bundle2 stream\n')
625 629
626 630 def _readpartheader(self):
627 631 """reads a part header size and return the bytes blob
628 632
629 633 returns None if empty"""
630 634 headersize = self._unpack(_fpartheadersize)[0]
631 635 if headersize < 0:
632 636 raise error.BundleValueError('negative part header size: %i'
633 637 % headersize)
634 638 self.ui.debug('part header size: %i\n' % headersize)
635 639 if headersize:
636 640 return self._readexact(headersize)
637 641 return None
638 642
639 643 def compressed(self):
640 644 return False
641 645
642 646 formatmap = {'20': unbundle20}
643 647
644 648 class bundlepart(object):
645 649 """A bundle2 part contains application level payload
646 650
647 651 The part `type` is used to route the part to the application level
648 652 handler.
649 653
650 654 The part payload is contained in ``part.data``. It could be raw bytes or a
651 655 generator of byte chunks.
652 656
653 657 You can add parameters to the part using the ``addparam`` method.
654 658 Parameters can be either mandatory (default) or advisory. Remote side
655 659 should be able to safely ignore the advisory ones.
656 660
657 661 Both data and parameters cannot be modified after the generation has begun.
658 662 """
659 663
660 664 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
661 665 data='', mandatory=True):
662 666 validateparttype(parttype)
663 667 self.id = None
664 668 self.type = parttype
665 669 self._data = data
666 670 self._mandatoryparams = list(mandatoryparams)
667 671 self._advisoryparams = list(advisoryparams)
668 672 # checking for duplicated entries
669 673 self._seenparams = set()
670 674 for pname, __ in self._mandatoryparams + self._advisoryparams:
671 675 if pname in self._seenparams:
672 676 raise RuntimeError('duplicated params: %s' % pname)
673 677 self._seenparams.add(pname)
674 678 # status of the part's generation:
675 679 # - None: not started,
676 680 # - False: currently generated,
677 681 # - True: generation done.
678 682 self._generated = None
679 683 self.mandatory = mandatory
680 684
681 685 def copy(self):
682 686 """return a copy of the part
683 687
684 688 The new part have the very same content but no partid assigned yet.
685 689 Parts with generated data cannot be copied."""
686 690 assert not util.safehasattr(self.data, 'next')
687 691 return self.__class__(self.type, self._mandatoryparams,
688 692 self._advisoryparams, self._data, self.mandatory)
689 693
690 694 # methods used to defines the part content
691 695 def __setdata(self, data):
692 696 if self._generated is not None:
693 697 raise error.ReadOnlyPartError('part is being generated')
694 698 self._data = data
695 699 def __getdata(self):
696 700 return self._data
697 701 data = property(__getdata, __setdata)
698 702
699 703 @property
700 704 def mandatoryparams(self):
701 705 # make it an immutable tuple to force people through ``addparam``
702 706 return tuple(self._mandatoryparams)
703 707
704 708 @property
705 709 def advisoryparams(self):
706 710 # make it an immutable tuple to force people through ``addparam``
707 711 return tuple(self._advisoryparams)
708 712
709 713 def addparam(self, name, value='', mandatory=True):
710 714 if self._generated is not None:
711 715 raise error.ReadOnlyPartError('part is being generated')
712 716 if name in self._seenparams:
713 717 raise ValueError('duplicated params: %s' % name)
714 718 self._seenparams.add(name)
715 719 params = self._advisoryparams
716 720 if mandatory:
717 721 params = self._mandatoryparams
718 722 params.append((name, value))
719 723
720 724 # methods used to generates the bundle2 stream
721 725 def getchunks(self):
722 726 if self._generated is not None:
723 727 raise RuntimeError('part can only be consumed once')
724 728 self._generated = False
725 729 #### header
726 730 if self.mandatory:
727 731 parttype = self.type.upper()
728 732 else:
729 733 parttype = self.type.lower()
730 734 ## parttype
731 735 header = [_pack(_fparttypesize, len(parttype)),
732 736 parttype, _pack(_fpartid, self.id),
733 737 ]
734 738 ## parameters
735 739 # count
736 740 manpar = self.mandatoryparams
737 741 advpar = self.advisoryparams
738 742 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
739 743 # size
740 744 parsizes = []
741 745 for key, value in manpar:
742 746 parsizes.append(len(key))
743 747 parsizes.append(len(value))
744 748 for key, value in advpar:
745 749 parsizes.append(len(key))
746 750 parsizes.append(len(value))
747 751 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
748 752 header.append(paramsizes)
749 753 # key, value
750 754 for key, value in manpar:
751 755 header.append(key)
752 756 header.append(value)
753 757 for key, value in advpar:
754 758 header.append(key)
755 759 header.append(value)
756 760 ## finalize header
757 761 headerchunk = ''.join(header)
758 762 yield _pack(_fpartheadersize, len(headerchunk))
759 763 yield headerchunk
760 764 ## payload
761 765 try:
762 766 for chunk in self._payloadchunks():
763 767 yield _pack(_fpayloadsize, len(chunk))
764 768 yield chunk
765 769 except BaseException, exc:
766 770 # backup exception data for later
767 771 exc_info = sys.exc_info()
768 772 msg = 'unexpected error: %s' % exc
769 773 interpart = bundlepart('error:abort', [('message', msg)],
770 774 mandatory=False)
771 775 interpart.id = 0
772 776 yield _pack(_fpayloadsize, -1)
773 777 for chunk in interpart.getchunks():
774 778 yield chunk
775 779 # abort current part payload
776 780 yield _pack(_fpayloadsize, 0)
777 781 raise exc_info[0], exc_info[1], exc_info[2]
778 782 # end of payload
779 783 yield _pack(_fpayloadsize, 0)
780 784 self._generated = True
781 785
782 786 def _payloadchunks(self):
783 787 """yield chunks of a the part payload
784 788
785 789 Exists to handle the different methods to provide data to a part."""
786 790 # we only support fixed size data now.
787 791 # This will be improved in the future.
788 792 if util.safehasattr(self.data, 'next'):
789 793 buff = util.chunkbuffer(self.data)
790 794 chunk = buff.read(preferedchunksize)
791 795 while chunk:
792 796 yield chunk
793 797 chunk = buff.read(preferedchunksize)
794 798 elif len(self.data):
795 799 yield self.data
796 800
797 801
798 802 flaginterrupt = -1
799 803
800 804 class interrupthandler(unpackermixin):
801 805 """read one part and process it with restricted capability
802 806
803 807 This allows to transmit exception raised on the producer size during part
804 808 iteration while the consumer is reading a part.
805 809
806 810 Part processed in this manner only have access to a ui object,"""
807 811
808 812 def __init__(self, ui, fp):
809 813 super(interrupthandler, self).__init__(fp)
810 814 self.ui = ui
811 815
812 816 def _readpartheader(self):
813 817 """reads a part header size and return the bytes blob
814 818
815 819 returns None if empty"""
816 820 headersize = self._unpack(_fpartheadersize)[0]
817 821 if headersize < 0:
818 822 raise error.BundleValueError('negative part header size: %i'
819 823 % headersize)
820 824 self.ui.debug('part header size: %i\n' % headersize)
821 825 if headersize:
822 826 return self._readexact(headersize)
823 827 return None
824 828
825 829 def __call__(self):
826 830 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
827 831 headerblock = self._readpartheader()
828 832 if headerblock is None:
829 833 self.ui.debug('no part found during interruption.\n')
830 834 return
831 835 part = unbundlepart(self.ui, headerblock, self._fp)
832 836 op = interruptoperation(self.ui)
833 837 _processpart(op, part)
834 838
835 839 class interruptoperation(object):
836 840 """A limited operation to be use by part handler during interruption
837 841
838 842 It only have access to an ui object.
839 843 """
840 844
841 845 def __init__(self, ui):
842 846 self.ui = ui
843 847 self.reply = None
844 848 self.captureoutput = False
845 849
846 850 @property
847 851 def repo(self):
848 852 raise RuntimeError('no repo access from stream interruption')
849 853
850 854 def gettransaction(self):
851 855 raise TransactionUnavailable('no repo access from stream interruption')
852 856
853 857 class unbundlepart(unpackermixin):
854 858 """a bundle part read from a bundle"""
855 859
856 860 def __init__(self, ui, header, fp):
857 861 super(unbundlepart, self).__init__(fp)
858 862 self.ui = ui
859 863 # unbundle state attr
860 864 self._headerdata = header
861 865 self._headeroffset = 0
862 866 self._initialized = False
863 867 self.consumed = False
864 868 # part data
865 869 self.id = None
866 870 self.type = None
867 871 self.mandatoryparams = None
868 872 self.advisoryparams = None
869 873 self.params = None
870 874 self.mandatorykeys = ()
871 875 self._payloadstream = None
872 876 self._readheader()
873 877 self._mandatory = None
874 878 self._chunkindex = [] #(payload, file) position tuples for chunk starts
875 879 self._pos = 0
876 880
877 881 def _fromheader(self, size):
878 882 """return the next <size> byte from the header"""
879 883 offset = self._headeroffset
880 884 data = self._headerdata[offset:(offset + size)]
881 885 self._headeroffset = offset + size
882 886 return data
883 887
884 888 def _unpackheader(self, format):
885 889 """read given format from header
886 890
887 891 This automatically compute the size of the format to read."""
888 892 data = self._fromheader(struct.calcsize(format))
889 893 return _unpack(format, data)
890 894
891 895 def _initparams(self, mandatoryparams, advisoryparams):
892 896 """internal function to setup all logic related parameters"""
893 897 # make it read only to prevent people touching it by mistake.
894 898 self.mandatoryparams = tuple(mandatoryparams)
895 899 self.advisoryparams = tuple(advisoryparams)
896 900 # user friendly UI
897 901 self.params = dict(self.mandatoryparams)
898 902 self.params.update(dict(self.advisoryparams))
899 903 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
900 904
901 905 def _payloadchunks(self, chunknum=0):
902 906 '''seek to specified chunk and start yielding data'''
903 907 if len(self._chunkindex) == 0:
904 908 assert chunknum == 0, 'Must start with chunk 0'
905 909 self._chunkindex.append((0, super(unbundlepart, self).tell()))
906 910 else:
907 911 assert chunknum < len(self._chunkindex), \
908 912 'Unknown chunk %d' % chunknum
909 913 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
910 914
911 915 pos = self._chunkindex[chunknum][0]
912 916 payloadsize = self._unpack(_fpayloadsize)[0]
913 917 self.ui.debug('payload chunk size: %i\n' % payloadsize)
914 918 while payloadsize:
915 919 if payloadsize == flaginterrupt:
916 920 # interruption detection, the handler will now read a
917 921 # single part and process it.
918 922 interrupthandler(self.ui, self._fp)()
919 923 elif payloadsize < 0:
920 924 msg = 'negative payload chunk size: %i' % payloadsize
921 925 raise error.BundleValueError(msg)
922 926 else:
923 927 result = self._readexact(payloadsize)
924 928 chunknum += 1
925 929 pos += payloadsize
926 930 if chunknum == len(self._chunkindex):
927 931 self._chunkindex.append((pos,
928 932 super(unbundlepart, self).tell()))
929 933 yield result
930 934 payloadsize = self._unpack(_fpayloadsize)[0]
931 935 self.ui.debug('payload chunk size: %i\n' % payloadsize)
932 936
933 937 def _findchunk(self, pos):
934 938 '''for a given payload position, return a chunk number and offset'''
935 939 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
936 940 if ppos == pos:
937 941 return chunk, 0
938 942 elif ppos > pos:
939 943 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
940 944 raise ValueError('Unknown chunk')
941 945
942 946 def _readheader(self):
943 947 """read the header and setup the object"""
944 948 typesize = self._unpackheader(_fparttypesize)[0]
945 949 self.type = self._fromheader(typesize)
946 950 self.ui.debug('part type: "%s"\n' % self.type)
947 951 self.id = self._unpackheader(_fpartid)[0]
948 952 self.ui.debug('part id: "%s"\n' % self.id)
949 953 # extract mandatory bit from type
950 954 self.mandatory = (self.type != self.type.lower())
951 955 self.type = self.type.lower()
952 956 ## reading parameters
953 957 # param count
954 958 mancount, advcount = self._unpackheader(_fpartparamcount)
955 959 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
956 960 # param size
957 961 fparamsizes = _makefpartparamsizes(mancount + advcount)
958 962 paramsizes = self._unpackheader(fparamsizes)
959 963 # make it a list of couple again
960 964 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
961 965 # split mandatory from advisory
962 966 mansizes = paramsizes[:mancount]
963 967 advsizes = paramsizes[mancount:]
964 968 # retrieve param value
965 969 manparams = []
966 970 for key, value in mansizes:
967 971 manparams.append((self._fromheader(key), self._fromheader(value)))
968 972 advparams = []
969 973 for key, value in advsizes:
970 974 advparams.append((self._fromheader(key), self._fromheader(value)))
971 975 self._initparams(manparams, advparams)
972 976 ## part payload
973 977 self._payloadstream = util.chunkbuffer(self._payloadchunks())
974 978 # we read the data, tell it
975 979 self._initialized = True
976 980
977 981 def read(self, size=None):
978 982 """read payload data"""
979 983 if not self._initialized:
980 984 self._readheader()
981 985 if size is None:
982 986 data = self._payloadstream.read()
983 987 else:
984 988 data = self._payloadstream.read(size)
985 989 if size is None or len(data) < size:
986 990 self.consumed = True
987 991 self._pos += len(data)
988 992 return data
989 993
990 994 def tell(self):
991 995 return self._pos
992 996
993 997 def seek(self, offset, whence=0):
994 998 if whence == 0:
995 999 newpos = offset
996 1000 elif whence == 1:
997 1001 newpos = self._pos + offset
998 1002 elif whence == 2:
999 1003 if not self.consumed:
1000 1004 self.read()
1001 1005 newpos = self._chunkindex[-1][0] - offset
1002 1006 else:
1003 1007 raise ValueError('Unknown whence value: %r' % (whence,))
1004 1008
1005 1009 if newpos > self._chunkindex[-1][0] and not self.consumed:
1006 1010 self.read()
1007 1011 if not 0 <= newpos <= self._chunkindex[-1][0]:
1008 1012 raise ValueError('Offset out of range')
1009 1013
1010 1014 if self._pos != newpos:
1011 1015 chunk, internaloffset = self._findchunk(newpos)
1012 1016 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1013 1017 adjust = self.read(internaloffset)
1014 1018 if len(adjust) != internaloffset:
1015 1019 raise util.Abort(_('Seek failed\n'))
1016 1020 self._pos = newpos
1017 1021
1018 1022 capabilities = {'HG20': (),
1019 1023 'listkeys': (),
1020 1024 'pushkey': (),
1021 1025 'digests': tuple(sorted(util.DIGESTS.keys())),
1022 1026 'remote-changegroup': ('http', 'https'),
1023 1027 }
1024 1028
1025 1029 def getrepocaps(repo, allowpushback=False):
1026 1030 """return the bundle2 capabilities for a given repo
1027 1031
1028 1032 Exists to allow extensions (like evolution) to mutate the capabilities.
1029 1033 """
1030 1034 caps = capabilities.copy()
1031 1035 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1032 1036 if obsolete.isenabled(repo, obsolete.exchangeopt):
1033 1037 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1034 1038 caps['obsmarkers'] = supportedformat
1035 1039 if allowpushback:
1036 1040 caps['pushback'] = ()
1037 1041 return caps
1038 1042
1039 1043 def bundle2caps(remote):
1040 1044 """return the bundle capabilities of a peer as dict"""
1041 1045 raw = remote.capable('bundle2')
1042 1046 if not raw and raw != '':
1043 1047 return {}
1044 1048 capsblob = urllib.unquote(remote.capable('bundle2'))
1045 1049 return decodecaps(capsblob)
1046 1050
1047 1051 def obsmarkersversion(caps):
1048 1052 """extract the list of supported obsmarkers versions from a bundle2caps dict
1049 1053 """
1050 1054 obscaps = caps.get('obsmarkers', ())
1051 1055 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1052 1056
1053 1057 @parthandler('changegroup', ('version',))
1054 1058 def handlechangegroup(op, inpart):
1055 1059 """apply a changegroup part on the repo
1056 1060
1057 1061 This is a very early implementation that will massive rework before being
1058 1062 inflicted to any end-user.
1059 1063 """
1060 1064 # Make sure we trigger a transaction creation
1061 1065 #
1062 1066 # The addchangegroup function will get a transaction object by itself, but
1063 1067 # we need to make sure we trigger the creation of a transaction object used
1064 1068 # for the whole processing scope.
1065 1069 op.gettransaction()
1066 1070 unpackerversion = inpart.params.get('version', '01')
1067 1071 # We should raise an appropriate exception here
1068 1072 unpacker = changegroup.packermap[unpackerversion][1]
1069 1073 cg = unpacker(inpart, 'UN')
1070 1074 # the source and url passed here are overwritten by the one contained in
1071 1075 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1072 1076 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1073 1077 op.records.add('changegroup', {'return': ret})
1074 1078 if op.reply is not None:
1075 1079 # This is definitely not the final form of this
1076 1080 # return. But one need to start somewhere.
1077 1081 part = op.reply.newpart('reply:changegroup', mandatory=False)
1078 1082 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1079 1083 part.addparam('return', '%i' % ret, mandatory=False)
1080 1084 assert not inpart.read()
1081 1085
1082 1086 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1083 1087 ['digest:%s' % k for k in util.DIGESTS.keys()])
1084 1088 @parthandler('remote-changegroup', _remotechangegroupparams)
1085 1089 def handleremotechangegroup(op, inpart):
1086 1090 """apply a bundle10 on the repo, given an url and validation information
1087 1091
1088 1092 All the information about the remote bundle to import are given as
1089 1093 parameters. The parameters include:
1090 1094 - url: the url to the bundle10.
1091 1095 - size: the bundle10 file size. It is used to validate what was
1092 1096 retrieved by the client matches the server knowledge about the bundle.
1093 1097 - digests: a space separated list of the digest types provided as
1094 1098 parameters.
1095 1099 - digest:<digest-type>: the hexadecimal representation of the digest with
1096 1100 that name. Like the size, it is used to validate what was retrieved by
1097 1101 the client matches what the server knows about the bundle.
1098 1102
1099 1103 When multiple digest types are given, all of them are checked.
1100 1104 """
1101 1105 try:
1102 1106 raw_url = inpart.params['url']
1103 1107 except KeyError:
1104 1108 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1105 1109 parsed_url = util.url(raw_url)
1106 1110 if parsed_url.scheme not in capabilities['remote-changegroup']:
1107 1111 raise util.Abort(_('remote-changegroup does not support %s urls') %
1108 1112 parsed_url.scheme)
1109 1113
1110 1114 try:
1111 1115 size = int(inpart.params['size'])
1112 1116 except ValueError:
1113 1117 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1114 1118 % 'size')
1115 1119 except KeyError:
1116 1120 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1117 1121
1118 1122 digests = {}
1119 1123 for typ in inpart.params.get('digests', '').split():
1120 1124 param = 'digest:%s' % typ
1121 1125 try:
1122 1126 value = inpart.params[param]
1123 1127 except KeyError:
1124 1128 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1125 1129 param)
1126 1130 digests[typ] = value
1127 1131
1128 1132 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1129 1133
1130 1134 # Make sure we trigger a transaction creation
1131 1135 #
1132 1136 # The addchangegroup function will get a transaction object by itself, but
1133 1137 # we need to make sure we trigger the creation of a transaction object used
1134 1138 # for the whole processing scope.
1135 1139 op.gettransaction()
1136 1140 import exchange
1137 1141 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1138 1142 if not isinstance(cg, changegroup.cg1unpacker):
1139 1143 raise util.Abort(_('%s: not a bundle version 1.0') %
1140 1144 util.hidepassword(raw_url))
1141 1145 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1142 1146 op.records.add('changegroup', {'return': ret})
1143 1147 if op.reply is not None:
1144 1148 # This is definitely not the final form of this
1145 1149 # return. But one need to start somewhere.
1146 1150 part = op.reply.newpart('reply:changegroup')
1147 1151 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1148 1152 part.addparam('return', '%i' % ret, mandatory=False)
1149 1153 try:
1150 1154 real_part.validate()
1151 1155 except util.Abort, e:
1152 1156 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1153 1157 (util.hidepassword(raw_url), str(e)))
1154 1158 assert not inpart.read()
1155 1159
1156 1160 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1157 1161 def handlereplychangegroup(op, inpart):
1158 1162 ret = int(inpart.params['return'])
1159 1163 replyto = int(inpart.params['in-reply-to'])
1160 1164 op.records.add('changegroup', {'return': ret}, replyto)
1161 1165
1162 1166 @parthandler('check:heads')
1163 1167 def handlecheckheads(op, inpart):
1164 1168 """check that head of the repo did not change
1165 1169
1166 1170 This is used to detect a push race when using unbundle.
1167 1171 This replaces the "heads" argument of unbundle."""
1168 1172 h = inpart.read(20)
1169 1173 heads = []
1170 1174 while len(h) == 20:
1171 1175 heads.append(h)
1172 1176 h = inpart.read(20)
1173 1177 assert not h
1174 1178 if heads != op.repo.heads():
1175 1179 raise error.PushRaced('repository changed while pushing - '
1176 1180 'please try again')
1177 1181
1178 1182 @parthandler('output')
1179 1183 def handleoutput(op, inpart):
1180 1184 """forward output captured on the server to the client"""
1181 1185 for line in inpart.read().splitlines():
1182 1186 op.ui.status(('remote: %s\n' % line))
1183 1187
1184 1188 @parthandler('replycaps')
1185 1189 def handlereplycaps(op, inpart):
1186 1190 """Notify that a reply bundle should be created
1187 1191
1188 1192 The payload contains the capabilities information for the reply"""
1189 1193 caps = decodecaps(inpart.read())
1190 1194 if op.reply is None:
1191 1195 op.reply = bundle20(op.ui, caps)
1192 1196
1193 1197 @parthandler('error:abort', ('message', 'hint'))
1194 1198 def handleerrorabort(op, inpart):
1195 1199 """Used to transmit abort error over the wire"""
1196 1200 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1197 1201
1198 1202 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1199 1203 def handleerrorunsupportedcontent(op, inpart):
1200 1204 """Used to transmit unknown content error over the wire"""
1201 1205 kwargs = {}
1202 1206 parttype = inpart.params.get('parttype')
1203 1207 if parttype is not None:
1204 1208 kwargs['parttype'] = parttype
1205 1209 params = inpart.params.get('params')
1206 1210 if params is not None:
1207 1211 kwargs['params'] = params.split('\0')
1208 1212
1209 1213 raise error.UnsupportedPartError(**kwargs)
1210 1214
1211 1215 @parthandler('error:pushraced', ('message',))
1212 1216 def handleerrorpushraced(op, inpart):
1213 1217 """Used to transmit push race error over the wire"""
1214 1218 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1215 1219
1216 1220 @parthandler('listkeys', ('namespace',))
1217 1221 def handlelistkeys(op, inpart):
1218 1222 """retrieve pushkey namespace content stored in a bundle2"""
1219 1223 namespace = inpart.params['namespace']
1220 1224 r = pushkey.decodekeys(inpart.read())
1221 1225 op.records.add('listkeys', (namespace, r))
1222 1226
1223 1227 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1224 1228 def handlepushkey(op, inpart):
1225 1229 """process a pushkey request"""
1226 1230 dec = pushkey.decode
1227 1231 namespace = dec(inpart.params['namespace'])
1228 1232 key = dec(inpart.params['key'])
1229 1233 old = dec(inpart.params['old'])
1230 1234 new = dec(inpart.params['new'])
1231 1235 ret = op.repo.pushkey(namespace, key, old, new)
1232 1236 record = {'namespace': namespace,
1233 1237 'key': key,
1234 1238 'old': old,
1235 1239 'new': new}
1236 1240 op.records.add('pushkey', record)
1237 1241 if op.reply is not None:
1238 1242 rpart = op.reply.newpart('reply:pushkey')
1239 1243 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1240 1244 rpart.addparam('return', '%i' % ret, mandatory=False)
1241 1245
1242 1246 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1243 1247 def handlepushkeyreply(op, inpart):
1244 1248 """retrieve the result of a pushkey request"""
1245 1249 ret = int(inpart.params['return'])
1246 1250 partid = int(inpart.params['in-reply-to'])
1247 1251 op.records.add('pushkey', {'return': ret}, partid)
1248 1252
1249 1253 @parthandler('obsmarkers')
1250 1254 def handleobsmarker(op, inpart):
1251 1255 """add a stream of obsmarkers to the repo"""
1252 1256 tr = op.gettransaction()
1253 1257 markerdata = inpart.read()
1254 1258 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1255 1259 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1256 1260 % len(markerdata))
1257 1261 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1258 1262 if new:
1259 1263 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1260 1264 op.records.add('obsmarkers', {'new': new})
1261 1265 if op.reply is not None:
1262 1266 rpart = op.reply.newpart('reply:obsmarkers')
1263 1267 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1264 1268 rpart.addparam('new', '%i' % new, mandatory=False)
1265 1269
1266 1270
1267 1271 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1268 1272 def handlepushkeyreply(op, inpart):
1269 1273 """retrieve the result of a pushkey request"""
1270 1274 ret = int(inpart.params['new'])
1271 1275 partid = int(inpart.params['in-reply-to'])
1272 1276 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now