##// END OF EJS Templates
bundle2: handle new line in 'indebug' function...
Pierre-Yves David -
r25320:697d8953 default
parent child Browse files
Show More
@@ -1,1282 +1,1282 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import errno
149 149 import sys
150 150 import util
151 151 import struct
152 152 import urllib
153 153 import string
154 154 import obsolete
155 155 import pushkey
156 156 import url
157 157 import re
158 158
159 159 import changegroup, error
160 160 from i18n import _
161 161
162 162 _pack = struct.pack
163 163 _unpack = struct.unpack
164 164
165 165 _fstreamparamsize = '>i'
166 166 _fpartheadersize = '>i'
167 167 _fparttypesize = '>B'
168 168 _fpartid = '>I'
169 169 _fpayloadsize = '>i'
170 170 _fpartparamcount = '>BB'
171 171
172 172 preferedchunksize = 4096
173 173
174 174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175 175
176 176 def outdebug(ui, message):
177 177 """debug regarding output stream (bundling)"""
178 178 ui.debug('bundle2-output: %s\n' % message)
179 179
180 180 def indebug(ui, message):
181 181 """debug on input stream (unbundling)"""
182 ui.debug('bundle2-input: %s' % message)
182 ui.debug('bundle2-input: %s\n' % message)
183 183
184 184 def validateparttype(parttype):
185 185 """raise ValueError if a parttype contains invalid character"""
186 186 if _parttypeforbidden.search(parttype):
187 187 raise ValueError(parttype)
188 188
189 189 def _makefpartparamsizes(nbparams):
190 190 """return a struct format to read part parameter sizes
191 191
192 192 The number parameters is variable so we need to build that format
193 193 dynamically.
194 194 """
195 195 return '>'+('BB'*nbparams)
196 196
197 197 parthandlermapping = {}
198 198
199 199 def parthandler(parttype, params=()):
200 200 """decorator that register a function as a bundle2 part handler
201 201
202 202 eg::
203 203
204 204 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
205 205 def myparttypehandler(...):
206 206 '''process a part of type "my part".'''
207 207 ...
208 208 """
209 209 validateparttype(parttype)
210 210 def _decorator(func):
211 211 lparttype = parttype.lower() # enforce lower case matching.
212 212 assert lparttype not in parthandlermapping
213 213 parthandlermapping[lparttype] = func
214 214 func.params = frozenset(params)
215 215 return func
216 216 return _decorator
217 217
218 218 class unbundlerecords(object):
219 219 """keep record of what happens during and unbundle
220 220
221 221 New records are added using `records.add('cat', obj)`. Where 'cat' is a
222 222 category of record and obj is an arbitrary object.
223 223
224 224 `records['cat']` will return all entries of this category 'cat'.
225 225
226 226 Iterating on the object itself will yield `('category', obj)` tuples
227 227 for all entries.
228 228
229 229 All iterations happens in chronological order.
230 230 """
231 231
232 232 def __init__(self):
233 233 self._categories = {}
234 234 self._sequences = []
235 235 self._replies = {}
236 236
237 237 def add(self, category, entry, inreplyto=None):
238 238 """add a new record of a given category.
239 239
240 240 The entry can then be retrieved in the list returned by
241 241 self['category']."""
242 242 self._categories.setdefault(category, []).append(entry)
243 243 self._sequences.append((category, entry))
244 244 if inreplyto is not None:
245 245 self.getreplies(inreplyto).add(category, entry)
246 246
247 247 def getreplies(self, partid):
248 248 """get the records that are replies to a specific part"""
249 249 return self._replies.setdefault(partid, unbundlerecords())
250 250
251 251 def __getitem__(self, cat):
252 252 return tuple(self._categories.get(cat, ()))
253 253
254 254 def __iter__(self):
255 255 return iter(self._sequences)
256 256
257 257 def __len__(self):
258 258 return len(self._sequences)
259 259
260 260 def __nonzero__(self):
261 261 return bool(self._sequences)
262 262
263 263 class bundleoperation(object):
264 264 """an object that represents a single bundling process
265 265
266 266 Its purpose is to carry unbundle-related objects and states.
267 267
268 268 A new object should be created at the beginning of each bundle processing.
269 269 The object is to be returned by the processing function.
270 270
271 271 The object has very little content now it will ultimately contain:
272 272 * an access to the repo the bundle is applied to,
273 273 * a ui object,
274 274 * a way to retrieve a transaction to add changes to the repo,
275 275 * a way to record the result of processing each part,
276 276 * a way to construct a bundle response when applicable.
277 277 """
278 278
279 279 def __init__(self, repo, transactiongetter, captureoutput=True):
280 280 self.repo = repo
281 281 self.ui = repo.ui
282 282 self.records = unbundlerecords()
283 283 self.gettransaction = transactiongetter
284 284 self.reply = None
285 285 self.captureoutput = captureoutput
286 286
287 287 class TransactionUnavailable(RuntimeError):
288 288 pass
289 289
290 290 def _notransaction():
291 291 """default method to get a transaction while processing a bundle
292 292
293 293 Raise an exception to highlight the fact that no transaction was expected
294 294 to be created"""
295 295 raise TransactionUnavailable()
296 296
297 297 def processbundle(repo, unbundler, transactiongetter=None, op=None):
298 298 """This function process a bundle, apply effect to/from a repo
299 299
300 300 It iterates over each part then searches for and uses the proper handling
301 301 code to process the part. Parts are processed in order.
302 302
303 303 This is very early version of this function that will be strongly reworked
304 304 before final usage.
305 305
306 306 Unknown Mandatory part will abort the process.
307 307
308 308 It is temporarily possible to provide a prebuilt bundleoperation to the
309 309 function. This is used to ensure output is properly propagated in case of
310 310 an error during the unbundling. This output capturing part will likely be
311 311 reworked and this ability will probably go away in the process.
312 312 """
313 313 if op is None:
314 314 if transactiongetter is None:
315 315 transactiongetter = _notransaction
316 316 op = bundleoperation(repo, transactiongetter)
317 317 # todo:
318 318 # - replace this is a init function soon.
319 319 # - exception catching
320 320 unbundler.params
321 321 iterparts = unbundler.iterparts()
322 322 part = None
323 323 try:
324 324 for part in iterparts:
325 325 _processpart(op, part)
326 326 except BaseException, exc:
327 327 for part in iterparts:
328 328 # consume the bundle content
329 329 part.seek(0, 2)
330 330 # Small hack to let caller code distinguish exceptions from bundle2
331 331 # processing from processing the old format. This is mostly
332 332 # needed to handle different return codes to unbundle according to the
333 333 # type of bundle. We should probably clean up or drop this return code
334 334 # craziness in a future version.
335 335 exc.duringunbundle2 = True
336 336 salvaged = []
337 337 if op.reply is not None:
338 338 salvaged = op.reply.salvageoutput()
339 339 exc._bundle2salvagedoutput = salvaged
340 340 raise
341 341 return op
342 342
343 343 def _processpart(op, part):
344 344 """process a single part from a bundle
345 345
346 346 The part is guaranteed to have been fully consumed when the function exits
347 347 (even if an exception is raised)."""
348 348 try:
349 349 try:
350 350 handler = parthandlermapping.get(part.type)
351 351 if handler is None:
352 352 raise error.UnsupportedPartError(parttype=part.type)
353 indebug(op.ui, 'found a handler for part %r\n' % part.type)
353 indebug(op.ui, 'found a handler for part %r' % part.type)
354 354 unknownparams = part.mandatorykeys - handler.params
355 355 if unknownparams:
356 356 unknownparams = list(unknownparams)
357 357 unknownparams.sort()
358 358 raise error.UnsupportedPartError(parttype=part.type,
359 359 params=unknownparams)
360 360 except error.UnsupportedPartError, exc:
361 361 if part.mandatory: # mandatory parts
362 362 raise
363 indebug(op.ui, 'ignoring unsupported advisory part %s\n' % exc)
363 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
364 364 return # skip to part processing
365 365
366 366 # handler is called outside the above try block so that we don't
367 367 # risk catching KeyErrors from anything other than the
368 368 # parthandlermapping lookup (any KeyError raised by handler()
369 369 # itself represents a defect of a different variety).
370 370 output = None
371 371 if op.captureoutput and op.reply is not None:
372 372 op.ui.pushbuffer(error=True, subproc=True)
373 373 output = ''
374 374 try:
375 375 handler(op, part)
376 376 finally:
377 377 if output is not None:
378 378 output = op.ui.popbuffer()
379 379 if output:
380 380 outpart = op.reply.newpart('output', data=output,
381 381 mandatory=False)
382 382 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
383 383 finally:
384 384 # consume the part content to not corrupt the stream.
385 385 part.seek(0, 2)
386 386
387 387
388 388 def decodecaps(blob):
389 389 """decode a bundle2 caps bytes blob into a dictionary
390 390
391 391 The blob is a list of capabilities (one per line)
392 392 Capabilities may have values using a line of the form::
393 393
394 394 capability=value1,value2,value3
395 395
396 396 The values are always a list."""
397 397 caps = {}
398 398 for line in blob.splitlines():
399 399 if not line:
400 400 continue
401 401 if '=' not in line:
402 402 key, vals = line, ()
403 403 else:
404 404 key, vals = line.split('=', 1)
405 405 vals = vals.split(',')
406 406 key = urllib.unquote(key)
407 407 vals = [urllib.unquote(v) for v in vals]
408 408 caps[key] = vals
409 409 return caps
410 410
411 411 def encodecaps(caps):
412 412 """encode a bundle2 caps dictionary into a bytes blob"""
413 413 chunks = []
414 414 for ca in sorted(caps):
415 415 vals = caps[ca]
416 416 ca = urllib.quote(ca)
417 417 vals = [urllib.quote(v) for v in vals]
418 418 if vals:
419 419 ca = "%s=%s" % (ca, ','.join(vals))
420 420 chunks.append(ca)
421 421 return '\n'.join(chunks)
422 422
423 423 class bundle20(object):
424 424 """represent an outgoing bundle2 container
425 425
426 426 Use the `addparam` method to add stream level parameter. and `newpart` to
427 427 populate it. Then call `getchunks` to retrieve all the binary chunks of
428 428 data that compose the bundle2 container."""
429 429
430 430 _magicstring = 'HG20'
431 431
432 432 def __init__(self, ui, capabilities=()):
433 433 self.ui = ui
434 434 self._params = []
435 435 self._parts = []
436 436 self.capabilities = dict(capabilities)
437 437
438 438 @property
439 439 def nbparts(self):
440 440 """total number of parts added to the bundler"""
441 441 return len(self._parts)
442 442
443 443 # methods used to defines the bundle2 content
444 444 def addparam(self, name, value=None):
445 445 """add a stream level parameter"""
446 446 if not name:
447 447 raise ValueError('empty parameter name')
448 448 if name[0] not in string.letters:
449 449 raise ValueError('non letter first character: %r' % name)
450 450 self._params.append((name, value))
451 451
452 452 def addpart(self, part):
453 453 """add a new part to the bundle2 container
454 454
455 455 Parts contains the actual applicative payload."""
456 456 assert part.id is None
457 457 part.id = len(self._parts) # very cheap counter
458 458 self._parts.append(part)
459 459
460 460 def newpart(self, typeid, *args, **kwargs):
461 461 """create a new part and add it to the containers
462 462
463 463 As the part is directly added to the containers. For now, this means
464 464 that any failure to properly initialize the part after calling
465 465 ``newpart`` should result in a failure of the whole bundling process.
466 466
467 467 You can still fall back to manually create and add if you need better
468 468 control."""
469 469 part = bundlepart(typeid, *args, **kwargs)
470 470 self.addpart(part)
471 471 return part
472 472
473 473 # methods used to generate the bundle2 stream
474 474 def getchunks(self):
475 475 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
476 476 yield self._magicstring
477 477 param = self._paramchunk()
478 478 outdebug(self.ui, 'bundle parameter: %s' % param)
479 479 yield _pack(_fstreamparamsize, len(param))
480 480 if param:
481 481 yield param
482 482
483 483 outdebug(self.ui, 'start of parts')
484 484 for part in self._parts:
485 485 outdebug(self.ui, 'bundle part: "%s"' % part.type)
486 486 for chunk in part.getchunks():
487 487 yield chunk
488 488 outdebug(self.ui, 'end of bundle')
489 489 yield _pack(_fpartheadersize, 0)
490 490
491 491 def _paramchunk(self):
492 492 """return a encoded version of all stream parameters"""
493 493 blocks = []
494 494 for par, value in self._params:
495 495 par = urllib.quote(par)
496 496 if value is not None:
497 497 value = urllib.quote(value)
498 498 par = '%s=%s' % (par, value)
499 499 blocks.append(par)
500 500 return ' '.join(blocks)
501 501
502 502 def salvageoutput(self):
503 503 """return a list with a copy of all output parts in the bundle
504 504
505 505 This is meant to be used during error handling to make sure we preserve
506 506 server output"""
507 507 salvaged = []
508 508 for part in self._parts:
509 509 if part.type.startswith('output'):
510 510 salvaged.append(part.copy())
511 511 return salvaged
512 512
513 513
514 514 class unpackermixin(object):
515 515 """A mixin to extract bytes and struct data from a stream"""
516 516
517 517 def __init__(self, fp):
518 518 self._fp = fp
519 519 self._seekable = (util.safehasattr(fp, 'seek') and
520 520 util.safehasattr(fp, 'tell'))
521 521
522 522 def _unpack(self, format):
523 523 """unpack this struct format from the stream"""
524 524 data = self._readexact(struct.calcsize(format))
525 525 return _unpack(format, data)
526 526
527 527 def _readexact(self, size):
528 528 """read exactly <size> bytes from the stream"""
529 529 return changegroup.readexactly(self._fp, size)
530 530
531 531 def seek(self, offset, whence=0):
532 532 """move the underlying file pointer"""
533 533 if self._seekable:
534 534 return self._fp.seek(offset, whence)
535 535 else:
536 536 raise NotImplementedError(_('File pointer is not seekable'))
537 537
538 538 def tell(self):
539 539 """return the file offset, or None if file is not seekable"""
540 540 if self._seekable:
541 541 try:
542 542 return self._fp.tell()
543 543 except IOError, e:
544 544 if e.errno == errno.ESPIPE:
545 545 self._seekable = False
546 546 else:
547 547 raise
548 548 return None
549 549
550 550 def close(self):
551 551 """close underlying file"""
552 552 if util.safehasattr(self._fp, 'close'):
553 553 return self._fp.close()
554 554
555 555 def getunbundler(ui, fp, header=None):
556 556 """return a valid unbundler object for a given header"""
557 557 if header is None:
558 558 header = changegroup.readexactly(fp, 4)
559 559 magic, version = header[0:2], header[2:4]
560 560 if magic != 'HG':
561 561 raise util.Abort(_('not a Mercurial bundle'))
562 562 unbundlerclass = formatmap.get(version)
563 563 if unbundlerclass is None:
564 564 raise util.Abort(_('unknown bundle version %s') % version)
565 565 unbundler = unbundlerclass(ui, fp)
566 indebug(ui, 'start processing of %s stream\n' % header)
566 indebug(ui, 'start processing of %s stream' % header)
567 567 return unbundler
568 568
569 569 class unbundle20(unpackermixin):
570 570 """interpret a bundle2 stream
571 571
572 572 This class is fed with a binary stream and yields parts through its
573 573 `iterparts` methods."""
574 574
575 575 def __init__(self, ui, fp):
576 576 """If header is specified, we do not read it out of the stream."""
577 577 self.ui = ui
578 578 super(unbundle20, self).__init__(fp)
579 579
580 580 @util.propertycache
581 581 def params(self):
582 582 """dictionary of stream level parameters"""
583 indebug(self.ui, 'reading bundle2 stream parameters\n')
583 indebug(self.ui, 'reading bundle2 stream parameters')
584 584 params = {}
585 585 paramssize = self._unpack(_fstreamparamsize)[0]
586 586 if paramssize < 0:
587 587 raise error.BundleValueError('negative bundle param size: %i'
588 588 % paramssize)
589 589 if paramssize:
590 590 for p in self._readexact(paramssize).split(' '):
591 591 p = p.split('=', 1)
592 592 p = [urllib.unquote(i) for i in p]
593 593 if len(p) < 2:
594 594 p.append(None)
595 595 self._processparam(*p)
596 596 params[p[0]] = p[1]
597 597 return params
598 598
599 599 def _processparam(self, name, value):
600 600 """process a parameter, applying its effect if needed
601 601
602 602 Parameter starting with a lower case letter are advisory and will be
603 603 ignored when unknown. Those starting with an upper case letter are
604 604 mandatory and will this function will raise a KeyError when unknown.
605 605
606 606 Note: no option are currently supported. Any input will be either
607 607 ignored or failing.
608 608 """
609 609 if not name:
610 610 raise ValueError('empty parameter name')
611 611 if name[0] not in string.letters:
612 612 raise ValueError('non letter first character: %r' % name)
613 613 # Some logic will be later added here to try to process the option for
614 614 # a dict of known parameter.
615 615 if name[0].islower():
616 indebug(self.ui, "ignoring unknown parameter %r\n" % name)
616 indebug(self.ui, "ignoring unknown parameter %r" % name)
617 617 else:
618 618 raise error.UnsupportedPartError(params=(name,))
619 619
620 620
621 621 def iterparts(self):
622 622 """yield all parts contained in the stream"""
623 623 # make sure param have been loaded
624 624 self.params
625 indebug(self.ui, 'start extraction of bundle2 parts\n')
625 indebug(self.ui, 'start extraction of bundle2 parts')
626 626 headerblock = self._readpartheader()
627 627 while headerblock is not None:
628 628 part = unbundlepart(self.ui, headerblock, self._fp)
629 629 yield part
630 630 part.seek(0, 2)
631 631 headerblock = self._readpartheader()
632 indebug(self.ui, 'end of bundle2 stream\n')
632 indebug(self.ui, 'end of bundle2 stream')
633 633
634 634 def _readpartheader(self):
635 635 """reads a part header size and return the bytes blob
636 636
637 637 returns None if empty"""
638 638 headersize = self._unpack(_fpartheadersize)[0]
639 639 if headersize < 0:
640 640 raise error.BundleValueError('negative part header size: %i'
641 641 % headersize)
642 indebug(self.ui, 'part header size: %i\n' % headersize)
642 indebug(self.ui, 'part header size: %i' % headersize)
643 643 if headersize:
644 644 return self._readexact(headersize)
645 645 return None
646 646
647 647 def compressed(self):
648 648 return False
649 649
650 650 formatmap = {'20': unbundle20}
651 651
652 652 class bundlepart(object):
653 653 """A bundle2 part contains application level payload
654 654
655 655 The part `type` is used to route the part to the application level
656 656 handler.
657 657
658 658 The part payload is contained in ``part.data``. It could be raw bytes or a
659 659 generator of byte chunks.
660 660
661 661 You can add parameters to the part using the ``addparam`` method.
662 662 Parameters can be either mandatory (default) or advisory. Remote side
663 663 should be able to safely ignore the advisory ones.
664 664
665 665 Both data and parameters cannot be modified after the generation has begun.
666 666 """
667 667
668 668 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
669 669 data='', mandatory=True):
670 670 validateparttype(parttype)
671 671 self.id = None
672 672 self.type = parttype
673 673 self._data = data
674 674 self._mandatoryparams = list(mandatoryparams)
675 675 self._advisoryparams = list(advisoryparams)
676 676 # checking for duplicated entries
677 677 self._seenparams = set()
678 678 for pname, __ in self._mandatoryparams + self._advisoryparams:
679 679 if pname in self._seenparams:
680 680 raise RuntimeError('duplicated params: %s' % pname)
681 681 self._seenparams.add(pname)
682 682 # status of the part's generation:
683 683 # - None: not started,
684 684 # - False: currently generated,
685 685 # - True: generation done.
686 686 self._generated = None
687 687 self.mandatory = mandatory
688 688
689 689 def copy(self):
690 690 """return a copy of the part
691 691
692 692 The new part have the very same content but no partid assigned yet.
693 693 Parts with generated data cannot be copied."""
694 694 assert not util.safehasattr(self.data, 'next')
695 695 return self.__class__(self.type, self._mandatoryparams,
696 696 self._advisoryparams, self._data, self.mandatory)
697 697
698 698 # methods used to defines the part content
699 699 def __setdata(self, data):
700 700 if self._generated is not None:
701 701 raise error.ReadOnlyPartError('part is being generated')
702 702 self._data = data
703 703 def __getdata(self):
704 704 return self._data
705 705 data = property(__getdata, __setdata)
706 706
707 707 @property
708 708 def mandatoryparams(self):
709 709 # make it an immutable tuple to force people through ``addparam``
710 710 return tuple(self._mandatoryparams)
711 711
712 712 @property
713 713 def advisoryparams(self):
714 714 # make it an immutable tuple to force people through ``addparam``
715 715 return tuple(self._advisoryparams)
716 716
717 717 def addparam(self, name, value='', mandatory=True):
718 718 if self._generated is not None:
719 719 raise error.ReadOnlyPartError('part is being generated')
720 720 if name in self._seenparams:
721 721 raise ValueError('duplicated params: %s' % name)
722 722 self._seenparams.add(name)
723 723 params = self._advisoryparams
724 724 if mandatory:
725 725 params = self._mandatoryparams
726 726 params.append((name, value))
727 727
728 728 # methods used to generates the bundle2 stream
729 729 def getchunks(self):
730 730 if self._generated is not None:
731 731 raise RuntimeError('part can only be consumed once')
732 732 self._generated = False
733 733 #### header
734 734 if self.mandatory:
735 735 parttype = self.type.upper()
736 736 else:
737 737 parttype = self.type.lower()
738 738 ## parttype
739 739 header = [_pack(_fparttypesize, len(parttype)),
740 740 parttype, _pack(_fpartid, self.id),
741 741 ]
742 742 ## parameters
743 743 # count
744 744 manpar = self.mandatoryparams
745 745 advpar = self.advisoryparams
746 746 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
747 747 # size
748 748 parsizes = []
749 749 for key, value in manpar:
750 750 parsizes.append(len(key))
751 751 parsizes.append(len(value))
752 752 for key, value in advpar:
753 753 parsizes.append(len(key))
754 754 parsizes.append(len(value))
755 755 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
756 756 header.append(paramsizes)
757 757 # key, value
758 758 for key, value in manpar:
759 759 header.append(key)
760 760 header.append(value)
761 761 for key, value in advpar:
762 762 header.append(key)
763 763 header.append(value)
764 764 ## finalize header
765 765 headerchunk = ''.join(header)
766 766 yield _pack(_fpartheadersize, len(headerchunk))
767 767 yield headerchunk
768 768 ## payload
769 769 try:
770 770 for chunk in self._payloadchunks():
771 771 yield _pack(_fpayloadsize, len(chunk))
772 772 yield chunk
773 773 except BaseException, exc:
774 774 # backup exception data for later
775 775 exc_info = sys.exc_info()
776 776 msg = 'unexpected error: %s' % exc
777 777 interpart = bundlepart('error:abort', [('message', msg)],
778 778 mandatory=False)
779 779 interpart.id = 0
780 780 yield _pack(_fpayloadsize, -1)
781 781 for chunk in interpart.getchunks():
782 782 yield chunk
783 783 # abort current part payload
784 784 yield _pack(_fpayloadsize, 0)
785 785 raise exc_info[0], exc_info[1], exc_info[2]
786 786 # end of payload
787 787 yield _pack(_fpayloadsize, 0)
788 788 self._generated = True
789 789
790 790 def _payloadchunks(self):
791 791 """yield chunks of a the part payload
792 792
793 793 Exists to handle the different methods to provide data to a part."""
794 794 # we only support fixed size data now.
795 795 # This will be improved in the future.
796 796 if util.safehasattr(self.data, 'next'):
797 797 buff = util.chunkbuffer(self.data)
798 798 chunk = buff.read(preferedchunksize)
799 799 while chunk:
800 800 yield chunk
801 801 chunk = buff.read(preferedchunksize)
802 802 elif len(self.data):
803 803 yield self.data
804 804
805 805
806 806 flaginterrupt = -1
807 807
808 808 class interrupthandler(unpackermixin):
809 809 """read one part and process it with restricted capability
810 810
811 811 This allows to transmit exception raised on the producer size during part
812 812 iteration while the consumer is reading a part.
813 813
814 814 Part processed in this manner only have access to a ui object,"""
815 815
816 816 def __init__(self, ui, fp):
817 817 super(interrupthandler, self).__init__(fp)
818 818 self.ui = ui
819 819
820 820 def _readpartheader(self):
821 821 """reads a part header size and return the bytes blob
822 822
823 823 returns None if empty"""
824 824 headersize = self._unpack(_fpartheadersize)[0]
825 825 if headersize < 0:
826 826 raise error.BundleValueError('negative part header size: %i'
827 827 % headersize)
828 828 indebug(self.ui, 'part header size: %i\n' % headersize)
829 829 if headersize:
830 830 return self._readexact(headersize)
831 831 return None
832 832
833 833 def __call__(self):
834 indebug(self.ui, 'bundle2 stream interruption, looking for a part.\n')
834 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
835 835 headerblock = self._readpartheader()
836 836 if headerblock is None:
837 indebug(self.ui, 'no part found during interruption.\n')
837 indebug(self.ui, 'no part found during interruption.')
838 838 return
839 839 part = unbundlepart(self.ui, headerblock, self._fp)
840 840 op = interruptoperation(self.ui)
841 841 _processpart(op, part)
842 842
843 843 class interruptoperation(object):
844 844 """A limited operation to be use by part handler during interruption
845 845
846 846 It only have access to an ui object.
847 847 """
848 848
849 849 def __init__(self, ui):
850 850 self.ui = ui
851 851 self.reply = None
852 852 self.captureoutput = False
853 853
854 854 @property
855 855 def repo(self):
856 856 raise RuntimeError('no repo access from stream interruption')
857 857
858 858 def gettransaction(self):
859 859 raise TransactionUnavailable('no repo access from stream interruption')
860 860
861 861 class unbundlepart(unpackermixin):
862 862 """a bundle part read from a bundle"""
863 863
864 864 def __init__(self, ui, header, fp):
865 865 super(unbundlepart, self).__init__(fp)
866 866 self.ui = ui
867 867 # unbundle state attr
868 868 self._headerdata = header
869 869 self._headeroffset = 0
870 870 self._initialized = False
871 871 self.consumed = False
872 872 # part data
873 873 self.id = None
874 874 self.type = None
875 875 self.mandatoryparams = None
876 876 self.advisoryparams = None
877 877 self.params = None
878 878 self.mandatorykeys = ()
879 879 self._payloadstream = None
880 880 self._readheader()
881 881 self._mandatory = None
882 882 self._chunkindex = [] #(payload, file) position tuples for chunk starts
883 883 self._pos = 0
884 884
885 885 def _fromheader(self, size):
886 886 """return the next <size> byte from the header"""
887 887 offset = self._headeroffset
888 888 data = self._headerdata[offset:(offset + size)]
889 889 self._headeroffset = offset + size
890 890 return data
891 891
892 892 def _unpackheader(self, format):
893 893 """read given format from header
894 894
895 895 This automatically compute the size of the format to read."""
896 896 data = self._fromheader(struct.calcsize(format))
897 897 return _unpack(format, data)
898 898
899 899 def _initparams(self, mandatoryparams, advisoryparams):
900 900 """internal function to setup all logic related parameters"""
901 901 # make it read only to prevent people touching it by mistake.
902 902 self.mandatoryparams = tuple(mandatoryparams)
903 903 self.advisoryparams = tuple(advisoryparams)
904 904 # user friendly UI
905 905 self.params = dict(self.mandatoryparams)
906 906 self.params.update(dict(self.advisoryparams))
907 907 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
908 908
909 909 def _payloadchunks(self, chunknum=0):
910 910 '''seek to specified chunk and start yielding data'''
911 911 if len(self._chunkindex) == 0:
912 912 assert chunknum == 0, 'Must start with chunk 0'
913 913 self._chunkindex.append((0, super(unbundlepart, self).tell()))
914 914 else:
915 915 assert chunknum < len(self._chunkindex), \
916 916 'Unknown chunk %d' % chunknum
917 917 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
918 918
919 919 pos = self._chunkindex[chunknum][0]
920 920 payloadsize = self._unpack(_fpayloadsize)[0]
921 indebug(self.ui, 'payload chunk size: %i\n' % payloadsize)
921 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
922 922 while payloadsize:
923 923 if payloadsize == flaginterrupt:
924 924 # interruption detection, the handler will now read a
925 925 # single part and process it.
926 926 interrupthandler(self.ui, self._fp)()
927 927 elif payloadsize < 0:
928 928 msg = 'negative payload chunk size: %i' % payloadsize
929 929 raise error.BundleValueError(msg)
930 930 else:
931 931 result = self._readexact(payloadsize)
932 932 chunknum += 1
933 933 pos += payloadsize
934 934 if chunknum == len(self._chunkindex):
935 935 self._chunkindex.append((pos,
936 936 super(unbundlepart, self).tell()))
937 937 yield result
938 938 payloadsize = self._unpack(_fpayloadsize)[0]
939 indebug(self.ui, 'payload chunk size: %i\n' % payloadsize)
939 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
940 940
941 941 def _findchunk(self, pos):
942 942 '''for a given payload position, return a chunk number and offset'''
943 943 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
944 944 if ppos == pos:
945 945 return chunk, 0
946 946 elif ppos > pos:
947 947 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
948 948 raise ValueError('Unknown chunk')
949 949
950 950 def _readheader(self):
951 951 """read the header and setup the object"""
952 952 typesize = self._unpackheader(_fparttypesize)[0]
953 953 self.type = self._fromheader(typesize)
954 indebug(self.ui, 'part type: "%s"\n' % self.type)
954 indebug(self.ui, 'part type: "%s"' % self.type)
955 955 self.id = self._unpackheader(_fpartid)[0]
956 indebug(self.ui, 'part id: "%s"\n' % self.id)
956 indebug(self.ui, 'part id: "%s"' % self.id)
957 957 # extract mandatory bit from type
958 958 self.mandatory = (self.type != self.type.lower())
959 959 self.type = self.type.lower()
960 960 ## reading parameters
961 961 # param count
962 962 mancount, advcount = self._unpackheader(_fpartparamcount)
963 indebug(self.ui, 'part parameters: %i\n' % (mancount + advcount))
963 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
964 964 # param size
965 965 fparamsizes = _makefpartparamsizes(mancount + advcount)
966 966 paramsizes = self._unpackheader(fparamsizes)
967 967 # make it a list of couple again
968 968 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
969 969 # split mandatory from advisory
970 970 mansizes = paramsizes[:mancount]
971 971 advsizes = paramsizes[mancount:]
972 972 # retrieve param value
973 973 manparams = []
974 974 for key, value in mansizes:
975 975 manparams.append((self._fromheader(key), self._fromheader(value)))
976 976 advparams = []
977 977 for key, value in advsizes:
978 978 advparams.append((self._fromheader(key), self._fromheader(value)))
979 979 self._initparams(manparams, advparams)
980 980 ## part payload
981 981 self._payloadstream = util.chunkbuffer(self._payloadchunks())
982 982 # we read the data, tell it
983 983 self._initialized = True
984 984
985 985 def read(self, size=None):
986 986 """read payload data"""
987 987 if not self._initialized:
988 988 self._readheader()
989 989 if size is None:
990 990 data = self._payloadstream.read()
991 991 else:
992 992 data = self._payloadstream.read(size)
993 993 if size is None or len(data) < size:
994 994 self.consumed = True
995 995 self._pos += len(data)
996 996 return data
997 997
998 998 def tell(self):
999 999 return self._pos
1000 1000
1001 1001 def seek(self, offset, whence=0):
1002 1002 if whence == 0:
1003 1003 newpos = offset
1004 1004 elif whence == 1:
1005 1005 newpos = self._pos + offset
1006 1006 elif whence == 2:
1007 1007 if not self.consumed:
1008 1008 self.read()
1009 1009 newpos = self._chunkindex[-1][0] - offset
1010 1010 else:
1011 1011 raise ValueError('Unknown whence value: %r' % (whence,))
1012 1012
1013 1013 if newpos > self._chunkindex[-1][0] and not self.consumed:
1014 1014 self.read()
1015 1015 if not 0 <= newpos <= self._chunkindex[-1][0]:
1016 1016 raise ValueError('Offset out of range')
1017 1017
1018 1018 if self._pos != newpos:
1019 1019 chunk, internaloffset = self._findchunk(newpos)
1020 1020 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1021 1021 adjust = self.read(internaloffset)
1022 1022 if len(adjust) != internaloffset:
1023 1023 raise util.Abort(_('Seek failed\n'))
1024 1024 self._pos = newpos
1025 1025
1026 1026 # These are only the static capabilities.
1027 1027 # Check the 'getrepocaps' function for the rest.
1028 1028 capabilities = {'HG20': (),
1029 1029 'listkeys': (),
1030 1030 'pushkey': (),
1031 1031 'digests': tuple(sorted(util.DIGESTS.keys())),
1032 1032 'remote-changegroup': ('http', 'https'),
1033 1033 }
1034 1034
1035 1035 def getrepocaps(repo, allowpushback=False):
1036 1036 """return the bundle2 capabilities for a given repo
1037 1037
1038 1038 Exists to allow extensions (like evolution) to mutate the capabilities.
1039 1039 """
1040 1040 caps = capabilities.copy()
1041 1041 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1042 1042 if obsolete.isenabled(repo, obsolete.exchangeopt):
1043 1043 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1044 1044 caps['obsmarkers'] = supportedformat
1045 1045 if allowpushback:
1046 1046 caps['pushback'] = ()
1047 1047 return caps
1048 1048
1049 1049 def bundle2caps(remote):
1050 1050 """return the bundle capabilities of a peer as dict"""
1051 1051 raw = remote.capable('bundle2')
1052 1052 if not raw and raw != '':
1053 1053 return {}
1054 1054 capsblob = urllib.unquote(remote.capable('bundle2'))
1055 1055 return decodecaps(capsblob)
1056 1056
1057 1057 def obsmarkersversion(caps):
1058 1058 """extract the list of supported obsmarkers versions from a bundle2caps dict
1059 1059 """
1060 1060 obscaps = caps.get('obsmarkers', ())
1061 1061 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1062 1062
1063 1063 @parthandler('changegroup', ('version',))
1064 1064 def handlechangegroup(op, inpart):
1065 1065 """apply a changegroup part on the repo
1066 1066
1067 1067 This is a very early implementation that will massive rework before being
1068 1068 inflicted to any end-user.
1069 1069 """
1070 1070 # Make sure we trigger a transaction creation
1071 1071 #
1072 1072 # The addchangegroup function will get a transaction object by itself, but
1073 1073 # we need to make sure we trigger the creation of a transaction object used
1074 1074 # for the whole processing scope.
1075 1075 op.gettransaction()
1076 1076 unpackerversion = inpart.params.get('version', '01')
1077 1077 # We should raise an appropriate exception here
1078 1078 unpacker = changegroup.packermap[unpackerversion][1]
1079 1079 cg = unpacker(inpart, 'UN')
1080 1080 # the source and url passed here are overwritten by the one contained in
1081 1081 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1082 1082 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1083 1083 op.records.add('changegroup', {'return': ret})
1084 1084 if op.reply is not None:
1085 1085 # This is definitely not the final form of this
1086 1086 # return. But one need to start somewhere.
1087 1087 part = op.reply.newpart('reply:changegroup', mandatory=False)
1088 1088 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1089 1089 part.addparam('return', '%i' % ret, mandatory=False)
1090 1090 assert not inpart.read()
1091 1091
1092 1092 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1093 1093 ['digest:%s' % k for k in util.DIGESTS.keys()])
1094 1094 @parthandler('remote-changegroup', _remotechangegroupparams)
1095 1095 def handleremotechangegroup(op, inpart):
1096 1096 """apply a bundle10 on the repo, given an url and validation information
1097 1097
1098 1098 All the information about the remote bundle to import are given as
1099 1099 parameters. The parameters include:
1100 1100 - url: the url to the bundle10.
1101 1101 - size: the bundle10 file size. It is used to validate what was
1102 1102 retrieved by the client matches the server knowledge about the bundle.
1103 1103 - digests: a space separated list of the digest types provided as
1104 1104 parameters.
1105 1105 - digest:<digest-type>: the hexadecimal representation of the digest with
1106 1106 that name. Like the size, it is used to validate what was retrieved by
1107 1107 the client matches what the server knows about the bundle.
1108 1108
1109 1109 When multiple digest types are given, all of them are checked.
1110 1110 """
1111 1111 try:
1112 1112 raw_url = inpart.params['url']
1113 1113 except KeyError:
1114 1114 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1115 1115 parsed_url = util.url(raw_url)
1116 1116 if parsed_url.scheme not in capabilities['remote-changegroup']:
1117 1117 raise util.Abort(_('remote-changegroup does not support %s urls') %
1118 1118 parsed_url.scheme)
1119 1119
1120 1120 try:
1121 1121 size = int(inpart.params['size'])
1122 1122 except ValueError:
1123 1123 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1124 1124 % 'size')
1125 1125 except KeyError:
1126 1126 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1127 1127
1128 1128 digests = {}
1129 1129 for typ in inpart.params.get('digests', '').split():
1130 1130 param = 'digest:%s' % typ
1131 1131 try:
1132 1132 value = inpart.params[param]
1133 1133 except KeyError:
1134 1134 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1135 1135 param)
1136 1136 digests[typ] = value
1137 1137
1138 1138 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1139 1139
1140 1140 # Make sure we trigger a transaction creation
1141 1141 #
1142 1142 # The addchangegroup function will get a transaction object by itself, but
1143 1143 # we need to make sure we trigger the creation of a transaction object used
1144 1144 # for the whole processing scope.
1145 1145 op.gettransaction()
1146 1146 import exchange
1147 1147 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1148 1148 if not isinstance(cg, changegroup.cg1unpacker):
1149 1149 raise util.Abort(_('%s: not a bundle version 1.0') %
1150 1150 util.hidepassword(raw_url))
1151 1151 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1152 1152 op.records.add('changegroup', {'return': ret})
1153 1153 if op.reply is not None:
1154 1154 # This is definitely not the final form of this
1155 1155 # return. But one need to start somewhere.
1156 1156 part = op.reply.newpart('reply:changegroup')
1157 1157 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1158 1158 part.addparam('return', '%i' % ret, mandatory=False)
1159 1159 try:
1160 1160 real_part.validate()
1161 1161 except util.Abort, e:
1162 1162 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1163 1163 (util.hidepassword(raw_url), str(e)))
1164 1164 assert not inpart.read()
1165 1165
1166 1166 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1167 1167 def handlereplychangegroup(op, inpart):
1168 1168 ret = int(inpart.params['return'])
1169 1169 replyto = int(inpart.params['in-reply-to'])
1170 1170 op.records.add('changegroup', {'return': ret}, replyto)
1171 1171
1172 1172 @parthandler('check:heads')
1173 1173 def handlecheckheads(op, inpart):
1174 1174 """check that head of the repo did not change
1175 1175
1176 1176 This is used to detect a push race when using unbundle.
1177 1177 This replaces the "heads" argument of unbundle."""
1178 1178 h = inpart.read(20)
1179 1179 heads = []
1180 1180 while len(h) == 20:
1181 1181 heads.append(h)
1182 1182 h = inpart.read(20)
1183 1183 assert not h
1184 1184 if heads != op.repo.heads():
1185 1185 raise error.PushRaced('repository changed while pushing - '
1186 1186 'please try again')
1187 1187
1188 1188 @parthandler('output')
1189 1189 def handleoutput(op, inpart):
1190 1190 """forward output captured on the server to the client"""
1191 1191 for line in inpart.read().splitlines():
1192 1192 op.ui.status(('remote: %s\n' % line))
1193 1193
1194 1194 @parthandler('replycaps')
1195 1195 def handlereplycaps(op, inpart):
1196 1196 """Notify that a reply bundle should be created
1197 1197
1198 1198 The payload contains the capabilities information for the reply"""
1199 1199 caps = decodecaps(inpart.read())
1200 1200 if op.reply is None:
1201 1201 op.reply = bundle20(op.ui, caps)
1202 1202
1203 1203 @parthandler('error:abort', ('message', 'hint'))
1204 1204 def handleerrorabort(op, inpart):
1205 1205 """Used to transmit abort error over the wire"""
1206 1206 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1207 1207
1208 1208 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1209 1209 def handleerrorunsupportedcontent(op, inpart):
1210 1210 """Used to transmit unknown content error over the wire"""
1211 1211 kwargs = {}
1212 1212 parttype = inpart.params.get('parttype')
1213 1213 if parttype is not None:
1214 1214 kwargs['parttype'] = parttype
1215 1215 params = inpart.params.get('params')
1216 1216 if params is not None:
1217 1217 kwargs['params'] = params.split('\0')
1218 1218
1219 1219 raise error.UnsupportedPartError(**kwargs)
1220 1220
1221 1221 @parthandler('error:pushraced', ('message',))
1222 1222 def handleerrorpushraced(op, inpart):
1223 1223 """Used to transmit push race error over the wire"""
1224 1224 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1225 1225
1226 1226 @parthandler('listkeys', ('namespace',))
1227 1227 def handlelistkeys(op, inpart):
1228 1228 """retrieve pushkey namespace content stored in a bundle2"""
1229 1229 namespace = inpart.params['namespace']
1230 1230 r = pushkey.decodekeys(inpart.read())
1231 1231 op.records.add('listkeys', (namespace, r))
1232 1232
1233 1233 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1234 1234 def handlepushkey(op, inpart):
1235 1235 """process a pushkey request"""
1236 1236 dec = pushkey.decode
1237 1237 namespace = dec(inpart.params['namespace'])
1238 1238 key = dec(inpart.params['key'])
1239 1239 old = dec(inpart.params['old'])
1240 1240 new = dec(inpart.params['new'])
1241 1241 ret = op.repo.pushkey(namespace, key, old, new)
1242 1242 record = {'namespace': namespace,
1243 1243 'key': key,
1244 1244 'old': old,
1245 1245 'new': new}
1246 1246 op.records.add('pushkey', record)
1247 1247 if op.reply is not None:
1248 1248 rpart = op.reply.newpart('reply:pushkey')
1249 1249 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1250 1250 rpart.addparam('return', '%i' % ret, mandatory=False)
1251 1251
1252 1252 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1253 1253 def handlepushkeyreply(op, inpart):
1254 1254 """retrieve the result of a pushkey request"""
1255 1255 ret = int(inpart.params['return'])
1256 1256 partid = int(inpart.params['in-reply-to'])
1257 1257 op.records.add('pushkey', {'return': ret}, partid)
1258 1258
1259 1259 @parthandler('obsmarkers')
1260 1260 def handleobsmarker(op, inpart):
1261 1261 """add a stream of obsmarkers to the repo"""
1262 1262 tr = op.gettransaction()
1263 1263 markerdata = inpart.read()
1264 1264 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1265 1265 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1266 1266 % len(markerdata))
1267 1267 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1268 1268 if new:
1269 1269 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1270 1270 op.records.add('obsmarkers', {'new': new})
1271 1271 if op.reply is not None:
1272 1272 rpart = op.reply.newpart('reply:obsmarkers')
1273 1273 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1274 1274 rpart.addparam('new', '%i' % new, mandatory=False)
1275 1275
1276 1276
1277 1277 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1278 1278 def handlepushkeyreply(op, inpart):
1279 1279 """retrieve the result of a pushkey request"""
1280 1280 ret = int(inpart.params['new'])
1281 1281 partid = int(inpart.params['in-reply-to'])
1282 1282 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now