##// END OF EJS Templates
bundle2: add a 'salvageoutput' method on bundle20...
Pierre-Yves David -
r24794:21f2e8f4 default
parent child Browse files
Show More
@@ -1,1248 +1,1260 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 import errno
149 149 import sys
150 150 import util
151 151 import struct
152 152 import urllib
153 153 import string
154 154 import obsolete
155 155 import pushkey
156 156 import url
157 157 import re
158 158
159 159 import changegroup, error
160 160 from i18n import _
161 161
162 162 _pack = struct.pack
163 163 _unpack = struct.unpack
164 164
165 165 _fstreamparamsize = '>i'
166 166 _fpartheadersize = '>i'
167 167 _fparttypesize = '>B'
168 168 _fpartid = '>I'
169 169 _fpayloadsize = '>i'
170 170 _fpartparamcount = '>BB'
171 171
172 172 preferedchunksize = 4096
173 173
174 174 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175 175
176 176 def validateparttype(parttype):
177 177 """raise ValueError if a parttype contains invalid character"""
178 178 if _parttypeforbidden.search(parttype):
179 179 raise ValueError(parttype)
180 180
181 181 def _makefpartparamsizes(nbparams):
182 182 """return a struct format to read part parameter sizes
183 183
184 184 The number parameters is variable so we need to build that format
185 185 dynamically.
186 186 """
187 187 return '>'+('BB'*nbparams)
188 188
189 189 parthandlermapping = {}
190 190
191 191 def parthandler(parttype, params=()):
192 192 """decorator that register a function as a bundle2 part handler
193 193
194 194 eg::
195 195
196 196 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
197 197 def myparttypehandler(...):
198 198 '''process a part of type "my part".'''
199 199 ...
200 200 """
201 201 validateparttype(parttype)
202 202 def _decorator(func):
203 203 lparttype = parttype.lower() # enforce lower case matching.
204 204 assert lparttype not in parthandlermapping
205 205 parthandlermapping[lparttype] = func
206 206 func.params = frozenset(params)
207 207 return func
208 208 return _decorator
209 209
210 210 class unbundlerecords(object):
211 211 """keep record of what happens during and unbundle
212 212
213 213 New records are added using `records.add('cat', obj)`. Where 'cat' is a
214 214 category of record and obj is an arbitrary object.
215 215
216 216 `records['cat']` will return all entries of this category 'cat'.
217 217
218 218 Iterating on the object itself will yield `('category', obj)` tuples
219 219 for all entries.
220 220
221 221 All iterations happens in chronological order.
222 222 """
223 223
224 224 def __init__(self):
225 225 self._categories = {}
226 226 self._sequences = []
227 227 self._replies = {}
228 228
229 229 def add(self, category, entry, inreplyto=None):
230 230 """add a new record of a given category.
231 231
232 232 The entry can then be retrieved in the list returned by
233 233 self['category']."""
234 234 self._categories.setdefault(category, []).append(entry)
235 235 self._sequences.append((category, entry))
236 236 if inreplyto is not None:
237 237 self.getreplies(inreplyto).add(category, entry)
238 238
239 239 def getreplies(self, partid):
240 240 """get the records that are replies to a specific part"""
241 241 return self._replies.setdefault(partid, unbundlerecords())
242 242
243 243 def __getitem__(self, cat):
244 244 return tuple(self._categories.get(cat, ()))
245 245
246 246 def __iter__(self):
247 247 return iter(self._sequences)
248 248
249 249 def __len__(self):
250 250 return len(self._sequences)
251 251
252 252 def __nonzero__(self):
253 253 return bool(self._sequences)
254 254
255 255 class bundleoperation(object):
256 256 """an object that represents a single bundling process
257 257
258 258 Its purpose is to carry unbundle-related objects and states.
259 259
260 260 A new object should be created at the beginning of each bundle processing.
261 261 The object is to be returned by the processing function.
262 262
263 263 The object has very little content now it will ultimately contain:
264 264 * an access to the repo the bundle is applied to,
265 265 * a ui object,
266 266 * a way to retrieve a transaction to add changes to the repo,
267 267 * a way to record the result of processing each part,
268 268 * a way to construct a bundle response when applicable.
269 269 """
270 270
271 271 def __init__(self, repo, transactiongetter):
272 272 self.repo = repo
273 273 self.ui = repo.ui
274 274 self.records = unbundlerecords()
275 275 self.gettransaction = transactiongetter
276 276 self.reply = None
277 277
278 278 class TransactionUnavailable(RuntimeError):
279 279 pass
280 280
281 281 def _notransaction():
282 282 """default method to get a transaction while processing a bundle
283 283
284 284 Raise an exception to highlight the fact that no transaction was expected
285 285 to be created"""
286 286 raise TransactionUnavailable()
287 287
288 288 def processbundle(repo, unbundler, transactiongetter=None):
289 289 """This function process a bundle, apply effect to/from a repo
290 290
291 291 It iterates over each part then searches for and uses the proper handling
292 292 code to process the part. Parts are processed in order.
293 293
294 294 This is very early version of this function that will be strongly reworked
295 295 before final usage.
296 296
297 297 Unknown Mandatory part will abort the process.
298 298 """
299 299 if transactiongetter is None:
300 300 transactiongetter = _notransaction
301 301 op = bundleoperation(repo, transactiongetter)
302 302 # todo:
303 303 # - replace this is a init function soon.
304 304 # - exception catching
305 305 unbundler.params
306 306 iterparts = unbundler.iterparts()
307 307 part = None
308 308 try:
309 309 for part in iterparts:
310 310 _processpart(op, part)
311 311 except Exception, exc:
312 312 for part in iterparts:
313 313 # consume the bundle content
314 314 part.seek(0, 2)
315 315 # Small hack to let caller code distinguish exceptions from bundle2
316 316 # processing from processing the old format. This is mostly
317 317 # needed to handle different return codes to unbundle according to the
318 318 # type of bundle. We should probably clean up or drop this return code
319 319 # craziness in a future version.
320 320 exc.duringunbundle2 = True
321 321 raise
322 322 return op
323 323
324 324 def _processpart(op, part):
325 325 """process a single part from a bundle
326 326
327 327 The part is guaranteed to have been fully consumed when the function exits
328 328 (even if an exception is raised)."""
329 329 try:
330 330 try:
331 331 handler = parthandlermapping.get(part.type)
332 332 if handler is None:
333 333 raise error.UnsupportedPartError(parttype=part.type)
334 334 op.ui.debug('found a handler for part %r\n' % part.type)
335 335 unknownparams = part.mandatorykeys - handler.params
336 336 if unknownparams:
337 337 unknownparams = list(unknownparams)
338 338 unknownparams.sort()
339 339 raise error.UnsupportedPartError(parttype=part.type,
340 340 params=unknownparams)
341 341 except error.UnsupportedPartError, exc:
342 342 if part.mandatory: # mandatory parts
343 343 raise
344 344 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
345 345 return # skip to part processing
346 346
347 347 # handler is called outside the above try block so that we don't
348 348 # risk catching KeyErrors from anything other than the
349 349 # parthandlermapping lookup (any KeyError raised by handler()
350 350 # itself represents a defect of a different variety).
351 351 output = None
352 352 if op.reply is not None:
353 353 op.ui.pushbuffer(error=True)
354 354 output = ''
355 355 try:
356 356 handler(op, part)
357 357 finally:
358 358 if output is not None:
359 359 output = op.ui.popbuffer()
360 360 if output:
361 361 outpart = op.reply.newpart('output', data=output,
362 362 mandatory=False)
363 363 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
364 364 finally:
365 365 # consume the part content to not corrupt the stream.
366 366 part.seek(0, 2)
367 367
368 368
369 369 def decodecaps(blob):
370 370 """decode a bundle2 caps bytes blob into a dictionary
371 371
372 372 The blob is a list of capabilities (one per line)
373 373 Capabilities may have values using a line of the form::
374 374
375 375 capability=value1,value2,value3
376 376
377 377 The values are always a list."""
378 378 caps = {}
379 379 for line in blob.splitlines():
380 380 if not line:
381 381 continue
382 382 if '=' not in line:
383 383 key, vals = line, ()
384 384 else:
385 385 key, vals = line.split('=', 1)
386 386 vals = vals.split(',')
387 387 key = urllib.unquote(key)
388 388 vals = [urllib.unquote(v) for v in vals]
389 389 caps[key] = vals
390 390 return caps
391 391
392 392 def encodecaps(caps):
393 393 """encode a bundle2 caps dictionary into a bytes blob"""
394 394 chunks = []
395 395 for ca in sorted(caps):
396 396 vals = caps[ca]
397 397 ca = urllib.quote(ca)
398 398 vals = [urllib.quote(v) for v in vals]
399 399 if vals:
400 400 ca = "%s=%s" % (ca, ','.join(vals))
401 401 chunks.append(ca)
402 402 return '\n'.join(chunks)
403 403
404 404 class bundle20(object):
405 405 """represent an outgoing bundle2 container
406 406
407 407 Use the `addparam` method to add stream level parameter. and `newpart` to
408 408 populate it. Then call `getchunks` to retrieve all the binary chunks of
409 409 data that compose the bundle2 container."""
410 410
411 411 _magicstring = 'HG20'
412 412
413 413 def __init__(self, ui, capabilities=()):
414 414 self.ui = ui
415 415 self._params = []
416 416 self._parts = []
417 417 self.capabilities = dict(capabilities)
418 418
419 419 @property
420 420 def nbparts(self):
421 421 """total number of parts added to the bundler"""
422 422 return len(self._parts)
423 423
424 424 # methods used to defines the bundle2 content
425 425 def addparam(self, name, value=None):
426 426 """add a stream level parameter"""
427 427 if not name:
428 428 raise ValueError('empty parameter name')
429 429 if name[0] not in string.letters:
430 430 raise ValueError('non letter first character: %r' % name)
431 431 self._params.append((name, value))
432 432
433 433 def addpart(self, part):
434 434 """add a new part to the bundle2 container
435 435
436 436 Parts contains the actual applicative payload."""
437 437 assert part.id is None
438 438 part.id = len(self._parts) # very cheap counter
439 439 self._parts.append(part)
440 440
441 441 def newpart(self, typeid, *args, **kwargs):
442 442 """create a new part and add it to the containers
443 443
444 444 As the part is directly added to the containers. For now, this means
445 445 that any failure to properly initialize the part after calling
446 446 ``newpart`` should result in a failure of the whole bundling process.
447 447
448 448 You can still fall back to manually create and add if you need better
449 449 control."""
450 450 part = bundlepart(typeid, *args, **kwargs)
451 451 self.addpart(part)
452 452 return part
453 453
454 454 # methods used to generate the bundle2 stream
455 455 def getchunks(self):
456 456 self.ui.debug('start emission of %s stream\n' % self._magicstring)
457 457 yield self._magicstring
458 458 param = self._paramchunk()
459 459 self.ui.debug('bundle parameter: %s\n' % param)
460 460 yield _pack(_fstreamparamsize, len(param))
461 461 if param:
462 462 yield param
463 463
464 464 self.ui.debug('start of parts\n')
465 465 for part in self._parts:
466 466 self.ui.debug('bundle part: "%s"\n' % part.type)
467 467 for chunk in part.getchunks():
468 468 yield chunk
469 469 self.ui.debug('end of bundle\n')
470 470 yield _pack(_fpartheadersize, 0)
471 471
472 472 def _paramchunk(self):
473 473 """return a encoded version of all stream parameters"""
474 474 blocks = []
475 475 for par, value in self._params:
476 476 par = urllib.quote(par)
477 477 if value is not None:
478 478 value = urllib.quote(value)
479 479 par = '%s=%s' % (par, value)
480 480 blocks.append(par)
481 481 return ' '.join(blocks)
482 482
483 def salvageoutput(self):
484 """return a list with a copy of all output parts in the bundle
485
486 This is meant to be used during error handling to make sure we preserve
487 server output"""
488 salvaged = []
489 for part in self._parts:
490 if part.type.startswith('output'):
491 salvaged.append(part.copy())
492 return salvaged
493
494
483 495 class unpackermixin(object):
484 496 """A mixin to extract bytes and struct data from a stream"""
485 497
486 498 def __init__(self, fp):
487 499 self._fp = fp
488 500 self._seekable = (util.safehasattr(fp, 'seek') and
489 501 util.safehasattr(fp, 'tell'))
490 502
491 503 def _unpack(self, format):
492 504 """unpack this struct format from the stream"""
493 505 data = self._readexact(struct.calcsize(format))
494 506 return _unpack(format, data)
495 507
496 508 def _readexact(self, size):
497 509 """read exactly <size> bytes from the stream"""
498 510 return changegroup.readexactly(self._fp, size)
499 511
500 512 def seek(self, offset, whence=0):
501 513 """move the underlying file pointer"""
502 514 if self._seekable:
503 515 return self._fp.seek(offset, whence)
504 516 else:
505 517 raise NotImplementedError(_('File pointer is not seekable'))
506 518
507 519 def tell(self):
508 520 """return the file offset, or None if file is not seekable"""
509 521 if self._seekable:
510 522 try:
511 523 return self._fp.tell()
512 524 except IOError, e:
513 525 if e.errno == errno.ESPIPE:
514 526 self._seekable = False
515 527 else:
516 528 raise
517 529 return None
518 530
519 531 def close(self):
520 532 """close underlying file"""
521 533 if util.safehasattr(self._fp, 'close'):
522 534 return self._fp.close()
523 535
524 536 def getunbundler(ui, fp, header=None):
525 537 """return a valid unbundler object for a given header"""
526 538 if header is None:
527 539 header = changegroup.readexactly(fp, 4)
528 540 magic, version = header[0:2], header[2:4]
529 541 if magic != 'HG':
530 542 raise util.Abort(_('not a Mercurial bundle'))
531 543 unbundlerclass = formatmap.get(version)
532 544 if unbundlerclass is None:
533 545 raise util.Abort(_('unknown bundle version %s') % version)
534 546 unbundler = unbundlerclass(ui, fp)
535 547 ui.debug('start processing of %s stream\n' % header)
536 548 return unbundler
537 549
538 550 class unbundle20(unpackermixin):
539 551 """interpret a bundle2 stream
540 552
541 553 This class is fed with a binary stream and yields parts through its
542 554 `iterparts` methods."""
543 555
544 556 def __init__(self, ui, fp):
545 557 """If header is specified, we do not read it out of the stream."""
546 558 self.ui = ui
547 559 super(unbundle20, self).__init__(fp)
548 560
549 561 @util.propertycache
550 562 def params(self):
551 563 """dictionary of stream level parameters"""
552 564 self.ui.debug('reading bundle2 stream parameters\n')
553 565 params = {}
554 566 paramssize = self._unpack(_fstreamparamsize)[0]
555 567 if paramssize < 0:
556 568 raise error.BundleValueError('negative bundle param size: %i'
557 569 % paramssize)
558 570 if paramssize:
559 571 for p in self._readexact(paramssize).split(' '):
560 572 p = p.split('=', 1)
561 573 p = [urllib.unquote(i) for i in p]
562 574 if len(p) < 2:
563 575 p.append(None)
564 576 self._processparam(*p)
565 577 params[p[0]] = p[1]
566 578 return params
567 579
568 580 def _processparam(self, name, value):
569 581 """process a parameter, applying its effect if needed
570 582
571 583 Parameter starting with a lower case letter are advisory and will be
572 584 ignored when unknown. Those starting with an upper case letter are
573 585 mandatory and will this function will raise a KeyError when unknown.
574 586
575 587 Note: no option are currently supported. Any input will be either
576 588 ignored or failing.
577 589 """
578 590 if not name:
579 591 raise ValueError('empty parameter name')
580 592 if name[0] not in string.letters:
581 593 raise ValueError('non letter first character: %r' % name)
582 594 # Some logic will be later added here to try to process the option for
583 595 # a dict of known parameter.
584 596 if name[0].islower():
585 597 self.ui.debug("ignoring unknown parameter %r\n" % name)
586 598 else:
587 599 raise error.UnsupportedPartError(params=(name,))
588 600
589 601
590 602 def iterparts(self):
591 603 """yield all parts contained in the stream"""
592 604 # make sure param have been loaded
593 605 self.params
594 606 self.ui.debug('start extraction of bundle2 parts\n')
595 607 headerblock = self._readpartheader()
596 608 while headerblock is not None:
597 609 part = unbundlepart(self.ui, headerblock, self._fp)
598 610 yield part
599 611 part.seek(0, 2)
600 612 headerblock = self._readpartheader()
601 613 self.ui.debug('end of bundle2 stream\n')
602 614
603 615 def _readpartheader(self):
604 616 """reads a part header size and return the bytes blob
605 617
606 618 returns None if empty"""
607 619 headersize = self._unpack(_fpartheadersize)[0]
608 620 if headersize < 0:
609 621 raise error.BundleValueError('negative part header size: %i'
610 622 % headersize)
611 623 self.ui.debug('part header size: %i\n' % headersize)
612 624 if headersize:
613 625 return self._readexact(headersize)
614 626 return None
615 627
616 628 def compressed(self):
617 629 return False
618 630
619 631 formatmap = {'20': unbundle20}
620 632
621 633 class bundlepart(object):
622 634 """A bundle2 part contains application level payload
623 635
624 636 The part `type` is used to route the part to the application level
625 637 handler.
626 638
627 639 The part payload is contained in ``part.data``. It could be raw bytes or a
628 640 generator of byte chunks.
629 641
630 642 You can add parameters to the part using the ``addparam`` method.
631 643 Parameters can be either mandatory (default) or advisory. Remote side
632 644 should be able to safely ignore the advisory ones.
633 645
634 646 Both data and parameters cannot be modified after the generation has begun.
635 647 """
636 648
637 649 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
638 650 data='', mandatory=True):
639 651 validateparttype(parttype)
640 652 self.id = None
641 653 self.type = parttype
642 654 self._data = data
643 655 self._mandatoryparams = list(mandatoryparams)
644 656 self._advisoryparams = list(advisoryparams)
645 657 # checking for duplicated entries
646 658 self._seenparams = set()
647 659 for pname, __ in self._mandatoryparams + self._advisoryparams:
648 660 if pname in self._seenparams:
649 661 raise RuntimeError('duplicated params: %s' % pname)
650 662 self._seenparams.add(pname)
651 663 # status of the part's generation:
652 664 # - None: not started,
653 665 # - False: currently generated,
654 666 # - True: generation done.
655 667 self._generated = None
656 668 self.mandatory = mandatory
657 669
658 670 def copy(self):
659 671 """return a copy of the part
660 672
661 673 The new part have the very same content but no partid assigned yet.
662 674 Parts with generated data cannot be copied."""
663 675 assert not util.safehasattr(self.data, 'next')
664 676 return self.__class__(self.type, self._mandatoryparams,
665 677 self._advisoryparams, self._data, self.mandatory)
666 678
667 679 # methods used to defines the part content
668 680 def __setdata(self, data):
669 681 if self._generated is not None:
670 682 raise error.ReadOnlyPartError('part is being generated')
671 683 self._data = data
672 684 def __getdata(self):
673 685 return self._data
674 686 data = property(__getdata, __setdata)
675 687
676 688 @property
677 689 def mandatoryparams(self):
678 690 # make it an immutable tuple to force people through ``addparam``
679 691 return tuple(self._mandatoryparams)
680 692
681 693 @property
682 694 def advisoryparams(self):
683 695 # make it an immutable tuple to force people through ``addparam``
684 696 return tuple(self._advisoryparams)
685 697
686 698 def addparam(self, name, value='', mandatory=True):
687 699 if self._generated is not None:
688 700 raise error.ReadOnlyPartError('part is being generated')
689 701 if name in self._seenparams:
690 702 raise ValueError('duplicated params: %s' % name)
691 703 self._seenparams.add(name)
692 704 params = self._advisoryparams
693 705 if mandatory:
694 706 params = self._mandatoryparams
695 707 params.append((name, value))
696 708
697 709 # methods used to generates the bundle2 stream
698 710 def getchunks(self):
699 711 if self._generated is not None:
700 712 raise RuntimeError('part can only be consumed once')
701 713 self._generated = False
702 714 #### header
703 715 if self.mandatory:
704 716 parttype = self.type.upper()
705 717 else:
706 718 parttype = self.type.lower()
707 719 ## parttype
708 720 header = [_pack(_fparttypesize, len(parttype)),
709 721 parttype, _pack(_fpartid, self.id),
710 722 ]
711 723 ## parameters
712 724 # count
713 725 manpar = self.mandatoryparams
714 726 advpar = self.advisoryparams
715 727 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
716 728 # size
717 729 parsizes = []
718 730 for key, value in manpar:
719 731 parsizes.append(len(key))
720 732 parsizes.append(len(value))
721 733 for key, value in advpar:
722 734 parsizes.append(len(key))
723 735 parsizes.append(len(value))
724 736 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
725 737 header.append(paramsizes)
726 738 # key, value
727 739 for key, value in manpar:
728 740 header.append(key)
729 741 header.append(value)
730 742 for key, value in advpar:
731 743 header.append(key)
732 744 header.append(value)
733 745 ## finalize header
734 746 headerchunk = ''.join(header)
735 747 yield _pack(_fpartheadersize, len(headerchunk))
736 748 yield headerchunk
737 749 ## payload
738 750 try:
739 751 for chunk in self._payloadchunks():
740 752 yield _pack(_fpayloadsize, len(chunk))
741 753 yield chunk
742 754 except Exception, exc:
743 755 # backup exception data for later
744 756 exc_info = sys.exc_info()
745 757 msg = 'unexpected error: %s' % exc
746 758 interpart = bundlepart('error:abort', [('message', msg)],
747 759 mandatory=False)
748 760 interpart.id = 0
749 761 yield _pack(_fpayloadsize, -1)
750 762 for chunk in interpart.getchunks():
751 763 yield chunk
752 764 # abort current part payload
753 765 yield _pack(_fpayloadsize, 0)
754 766 raise exc_info[0], exc_info[1], exc_info[2]
755 767 # end of payload
756 768 yield _pack(_fpayloadsize, 0)
757 769 self._generated = True
758 770
759 771 def _payloadchunks(self):
760 772 """yield chunks of a the part payload
761 773
762 774 Exists to handle the different methods to provide data to a part."""
763 775 # we only support fixed size data now.
764 776 # This will be improved in the future.
765 777 if util.safehasattr(self.data, 'next'):
766 778 buff = util.chunkbuffer(self.data)
767 779 chunk = buff.read(preferedchunksize)
768 780 while chunk:
769 781 yield chunk
770 782 chunk = buff.read(preferedchunksize)
771 783 elif len(self.data):
772 784 yield self.data
773 785
774 786
775 787 flaginterrupt = -1
776 788
777 789 class interrupthandler(unpackermixin):
778 790 """read one part and process it with restricted capability
779 791
780 792 This allows to transmit exception raised on the producer size during part
781 793 iteration while the consumer is reading a part.
782 794
783 795 Part processed in this manner only have access to a ui object,"""
784 796
785 797 def __init__(self, ui, fp):
786 798 super(interrupthandler, self).__init__(fp)
787 799 self.ui = ui
788 800
789 801 def _readpartheader(self):
790 802 """reads a part header size and return the bytes blob
791 803
792 804 returns None if empty"""
793 805 headersize = self._unpack(_fpartheadersize)[0]
794 806 if headersize < 0:
795 807 raise error.BundleValueError('negative part header size: %i'
796 808 % headersize)
797 809 self.ui.debug('part header size: %i\n' % headersize)
798 810 if headersize:
799 811 return self._readexact(headersize)
800 812 return None
801 813
802 814 def __call__(self):
803 815 self.ui.debug('bundle2 stream interruption, looking for a part.\n')
804 816 headerblock = self._readpartheader()
805 817 if headerblock is None:
806 818 self.ui.debug('no part found during interruption.\n')
807 819 return
808 820 part = unbundlepart(self.ui, headerblock, self._fp)
809 821 op = interruptoperation(self.ui)
810 822 _processpart(op, part)
811 823
812 824 class interruptoperation(object):
813 825 """A limited operation to be use by part handler during interruption
814 826
815 827 It only have access to an ui object.
816 828 """
817 829
818 830 def __init__(self, ui):
819 831 self.ui = ui
820 832 self.reply = None
821 833
822 834 @property
823 835 def repo(self):
824 836 raise RuntimeError('no repo access from stream interruption')
825 837
826 838 def gettransaction(self):
827 839 raise TransactionUnavailable('no repo access from stream interruption')
828 840
829 841 class unbundlepart(unpackermixin):
830 842 """a bundle part read from a bundle"""
831 843
832 844 def __init__(self, ui, header, fp):
833 845 super(unbundlepart, self).__init__(fp)
834 846 self.ui = ui
835 847 # unbundle state attr
836 848 self._headerdata = header
837 849 self._headeroffset = 0
838 850 self._initialized = False
839 851 self.consumed = False
840 852 # part data
841 853 self.id = None
842 854 self.type = None
843 855 self.mandatoryparams = None
844 856 self.advisoryparams = None
845 857 self.params = None
846 858 self.mandatorykeys = ()
847 859 self._payloadstream = None
848 860 self._readheader()
849 861 self._mandatory = None
850 862 self._chunkindex = [] #(payload, file) position tuples for chunk starts
851 863 self._pos = 0
852 864
853 865 def _fromheader(self, size):
854 866 """return the next <size> byte from the header"""
855 867 offset = self._headeroffset
856 868 data = self._headerdata[offset:(offset + size)]
857 869 self._headeroffset = offset + size
858 870 return data
859 871
860 872 def _unpackheader(self, format):
861 873 """read given format from header
862 874
863 875 This automatically compute the size of the format to read."""
864 876 data = self._fromheader(struct.calcsize(format))
865 877 return _unpack(format, data)
866 878
867 879 def _initparams(self, mandatoryparams, advisoryparams):
868 880 """internal function to setup all logic related parameters"""
869 881 # make it read only to prevent people touching it by mistake.
870 882 self.mandatoryparams = tuple(mandatoryparams)
871 883 self.advisoryparams = tuple(advisoryparams)
872 884 # user friendly UI
873 885 self.params = dict(self.mandatoryparams)
874 886 self.params.update(dict(self.advisoryparams))
875 887 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
876 888
877 889 def _payloadchunks(self, chunknum=0):
878 890 '''seek to specified chunk and start yielding data'''
879 891 if len(self._chunkindex) == 0:
880 892 assert chunknum == 0, 'Must start with chunk 0'
881 893 self._chunkindex.append((0, super(unbundlepart, self).tell()))
882 894 else:
883 895 assert chunknum < len(self._chunkindex), \
884 896 'Unknown chunk %d' % chunknum
885 897 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
886 898
887 899 pos = self._chunkindex[chunknum][0]
888 900 payloadsize = self._unpack(_fpayloadsize)[0]
889 901 self.ui.debug('payload chunk size: %i\n' % payloadsize)
890 902 while payloadsize:
891 903 if payloadsize == flaginterrupt:
892 904 # interruption detection, the handler will now read a
893 905 # single part and process it.
894 906 interrupthandler(self.ui, self._fp)()
895 907 elif payloadsize < 0:
896 908 msg = 'negative payload chunk size: %i' % payloadsize
897 909 raise error.BundleValueError(msg)
898 910 else:
899 911 result = self._readexact(payloadsize)
900 912 chunknum += 1
901 913 pos += payloadsize
902 914 if chunknum == len(self._chunkindex):
903 915 self._chunkindex.append((pos,
904 916 super(unbundlepart, self).tell()))
905 917 yield result
906 918 payloadsize = self._unpack(_fpayloadsize)[0]
907 919 self.ui.debug('payload chunk size: %i\n' % payloadsize)
908 920
909 921 def _findchunk(self, pos):
910 922 '''for a given payload position, return a chunk number and offset'''
911 923 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
912 924 if ppos == pos:
913 925 return chunk, 0
914 926 elif ppos > pos:
915 927 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
916 928 raise ValueError('Unknown chunk')
917 929
918 930 def _readheader(self):
919 931 """read the header and setup the object"""
920 932 typesize = self._unpackheader(_fparttypesize)[0]
921 933 self.type = self._fromheader(typesize)
922 934 self.ui.debug('part type: "%s"\n' % self.type)
923 935 self.id = self._unpackheader(_fpartid)[0]
924 936 self.ui.debug('part id: "%s"\n' % self.id)
925 937 # extract mandatory bit from type
926 938 self.mandatory = (self.type != self.type.lower())
927 939 self.type = self.type.lower()
928 940 ## reading parameters
929 941 # param count
930 942 mancount, advcount = self._unpackheader(_fpartparamcount)
931 943 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
932 944 # param size
933 945 fparamsizes = _makefpartparamsizes(mancount + advcount)
934 946 paramsizes = self._unpackheader(fparamsizes)
935 947 # make it a list of couple again
936 948 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
937 949 # split mandatory from advisory
938 950 mansizes = paramsizes[:mancount]
939 951 advsizes = paramsizes[mancount:]
940 952 # retrieve param value
941 953 manparams = []
942 954 for key, value in mansizes:
943 955 manparams.append((self._fromheader(key), self._fromheader(value)))
944 956 advparams = []
945 957 for key, value in advsizes:
946 958 advparams.append((self._fromheader(key), self._fromheader(value)))
947 959 self._initparams(manparams, advparams)
948 960 ## part payload
949 961 self._payloadstream = util.chunkbuffer(self._payloadchunks())
950 962 # we read the data, tell it
951 963 self._initialized = True
952 964
953 965 def read(self, size=None):
954 966 """read payload data"""
955 967 if not self._initialized:
956 968 self._readheader()
957 969 if size is None:
958 970 data = self._payloadstream.read()
959 971 else:
960 972 data = self._payloadstream.read(size)
961 973 if size is None or len(data) < size:
962 974 self.consumed = True
963 975 self._pos += len(data)
964 976 return data
965 977
966 978 def tell(self):
967 979 return self._pos
968 980
969 981 def seek(self, offset, whence=0):
970 982 if whence == 0:
971 983 newpos = offset
972 984 elif whence == 1:
973 985 newpos = self._pos + offset
974 986 elif whence == 2:
975 987 if not self.consumed:
976 988 self.read()
977 989 newpos = self._chunkindex[-1][0] - offset
978 990 else:
979 991 raise ValueError('Unknown whence value: %r' % (whence,))
980 992
981 993 if newpos > self._chunkindex[-1][0] and not self.consumed:
982 994 self.read()
983 995 if not 0 <= newpos <= self._chunkindex[-1][0]:
984 996 raise ValueError('Offset out of range')
985 997
986 998 if self._pos != newpos:
987 999 chunk, internaloffset = self._findchunk(newpos)
988 1000 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
989 1001 adjust = self.read(internaloffset)
990 1002 if len(adjust) != internaloffset:
991 1003 raise util.Abort(_('Seek failed\n'))
992 1004 self._pos = newpos
993 1005
994 1006 capabilities = {'HG20': (),
995 1007 'listkeys': (),
996 1008 'pushkey': (),
997 1009 'digests': tuple(sorted(util.DIGESTS.keys())),
998 1010 'remote-changegroup': ('http', 'https'),
999 1011 }
1000 1012
1001 1013 def getrepocaps(repo, allowpushback=False):
1002 1014 """return the bundle2 capabilities for a given repo
1003 1015
1004 1016 Exists to allow extensions (like evolution) to mutate the capabilities.
1005 1017 """
1006 1018 caps = capabilities.copy()
1007 1019 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1008 1020 if obsolete.isenabled(repo, obsolete.exchangeopt):
1009 1021 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1010 1022 caps['obsmarkers'] = supportedformat
1011 1023 if allowpushback:
1012 1024 caps['pushback'] = ()
1013 1025 return caps
1014 1026
1015 1027 def bundle2caps(remote):
1016 1028 """return the bundle capabilities of a peer as dict"""
1017 1029 raw = remote.capable('bundle2')
1018 1030 if not raw and raw != '':
1019 1031 return {}
1020 1032 capsblob = urllib.unquote(remote.capable('bundle2'))
1021 1033 return decodecaps(capsblob)
1022 1034
1023 1035 def obsmarkersversion(caps):
1024 1036 """extract the list of supported obsmarkers versions from a bundle2caps dict
1025 1037 """
1026 1038 obscaps = caps.get('obsmarkers', ())
1027 1039 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1028 1040
1029 1041 @parthandler('changegroup', ('version',))
1030 1042 def handlechangegroup(op, inpart):
1031 1043 """apply a changegroup part on the repo
1032 1044
1033 1045 This is a very early implementation that will massive rework before being
1034 1046 inflicted to any end-user.
1035 1047 """
1036 1048 # Make sure we trigger a transaction creation
1037 1049 #
1038 1050 # The addchangegroup function will get a transaction object by itself, but
1039 1051 # we need to make sure we trigger the creation of a transaction object used
1040 1052 # for the whole processing scope.
1041 1053 op.gettransaction()
1042 1054 unpackerversion = inpart.params.get('version', '01')
1043 1055 # We should raise an appropriate exception here
1044 1056 unpacker = changegroup.packermap[unpackerversion][1]
1045 1057 cg = unpacker(inpart, 'UN')
1046 1058 # the source and url passed here are overwritten by the one contained in
1047 1059 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1048 1060 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1049 1061 op.records.add('changegroup', {'return': ret})
1050 1062 if op.reply is not None:
1051 1063 # This is definitely not the final form of this
1052 1064 # return. But one need to start somewhere.
1053 1065 part = op.reply.newpart('reply:changegroup', mandatory=False)
1054 1066 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1055 1067 part.addparam('return', '%i' % ret, mandatory=False)
1056 1068 assert not inpart.read()
1057 1069
1058 1070 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1059 1071 ['digest:%s' % k for k in util.DIGESTS.keys()])
1060 1072 @parthandler('remote-changegroup', _remotechangegroupparams)
1061 1073 def handleremotechangegroup(op, inpart):
1062 1074 """apply a bundle10 on the repo, given an url and validation information
1063 1075
1064 1076 All the information about the remote bundle to import are given as
1065 1077 parameters. The parameters include:
1066 1078 - url: the url to the bundle10.
1067 1079 - size: the bundle10 file size. It is used to validate what was
1068 1080 retrieved by the client matches the server knowledge about the bundle.
1069 1081 - digests: a space separated list of the digest types provided as
1070 1082 parameters.
1071 1083 - digest:<digest-type>: the hexadecimal representation of the digest with
1072 1084 that name. Like the size, it is used to validate what was retrieved by
1073 1085 the client matches what the server knows about the bundle.
1074 1086
1075 1087 When multiple digest types are given, all of them are checked.
1076 1088 """
1077 1089 try:
1078 1090 raw_url = inpart.params['url']
1079 1091 except KeyError:
1080 1092 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1081 1093 parsed_url = util.url(raw_url)
1082 1094 if parsed_url.scheme not in capabilities['remote-changegroup']:
1083 1095 raise util.Abort(_('remote-changegroup does not support %s urls') %
1084 1096 parsed_url.scheme)
1085 1097
1086 1098 try:
1087 1099 size = int(inpart.params['size'])
1088 1100 except ValueError:
1089 1101 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1090 1102 % 'size')
1091 1103 except KeyError:
1092 1104 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1093 1105
1094 1106 digests = {}
1095 1107 for typ in inpart.params.get('digests', '').split():
1096 1108 param = 'digest:%s' % typ
1097 1109 try:
1098 1110 value = inpart.params[param]
1099 1111 except KeyError:
1100 1112 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1101 1113 param)
1102 1114 digests[typ] = value
1103 1115
1104 1116 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1105 1117
1106 1118 # Make sure we trigger a transaction creation
1107 1119 #
1108 1120 # The addchangegroup function will get a transaction object by itself, but
1109 1121 # we need to make sure we trigger the creation of a transaction object used
1110 1122 # for the whole processing scope.
1111 1123 op.gettransaction()
1112 1124 import exchange
1113 1125 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1114 1126 if not isinstance(cg, changegroup.cg1unpacker):
1115 1127 raise util.Abort(_('%s: not a bundle version 1.0') %
1116 1128 util.hidepassword(raw_url))
1117 1129 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1118 1130 op.records.add('changegroup', {'return': ret})
1119 1131 if op.reply is not None:
1120 1132 # This is definitely not the final form of this
1121 1133 # return. But one need to start somewhere.
1122 1134 part = op.reply.newpart('reply:changegroup')
1123 1135 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1124 1136 part.addparam('return', '%i' % ret, mandatory=False)
1125 1137 try:
1126 1138 real_part.validate()
1127 1139 except util.Abort, e:
1128 1140 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1129 1141 (util.hidepassword(raw_url), str(e)))
1130 1142 assert not inpart.read()
1131 1143
1132 1144 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1133 1145 def handlereplychangegroup(op, inpart):
1134 1146 ret = int(inpart.params['return'])
1135 1147 replyto = int(inpart.params['in-reply-to'])
1136 1148 op.records.add('changegroup', {'return': ret}, replyto)
1137 1149
1138 1150 @parthandler('check:heads')
1139 1151 def handlecheckheads(op, inpart):
1140 1152 """check that head of the repo did not change
1141 1153
1142 1154 This is used to detect a push race when using unbundle.
1143 1155 This replaces the "heads" argument of unbundle."""
1144 1156 h = inpart.read(20)
1145 1157 heads = []
1146 1158 while len(h) == 20:
1147 1159 heads.append(h)
1148 1160 h = inpart.read(20)
1149 1161 assert not h
1150 1162 if heads != op.repo.heads():
1151 1163 raise error.PushRaced('repository changed while pushing - '
1152 1164 'please try again')
1153 1165
1154 1166 @parthandler('output')
1155 1167 def handleoutput(op, inpart):
1156 1168 """forward output captured on the server to the client"""
1157 1169 for line in inpart.read().splitlines():
1158 1170 op.ui.write(('remote: %s\n' % line))
1159 1171
1160 1172 @parthandler('replycaps')
1161 1173 def handlereplycaps(op, inpart):
1162 1174 """Notify that a reply bundle should be created
1163 1175
1164 1176 The payload contains the capabilities information for the reply"""
1165 1177 caps = decodecaps(inpart.read())
1166 1178 if op.reply is None:
1167 1179 op.reply = bundle20(op.ui, caps)
1168 1180
1169 1181 @parthandler('error:abort', ('message', 'hint'))
1170 1182 def handleerrorabort(op, inpart):
1171 1183 """Used to transmit abort error over the wire"""
1172 1184 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1173 1185
1174 1186 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1175 1187 def handleerrorunsupportedcontent(op, inpart):
1176 1188 """Used to transmit unknown content error over the wire"""
1177 1189 kwargs = {}
1178 1190 parttype = inpart.params.get('parttype')
1179 1191 if parttype is not None:
1180 1192 kwargs['parttype'] = parttype
1181 1193 params = inpart.params.get('params')
1182 1194 if params is not None:
1183 1195 kwargs['params'] = params.split('\0')
1184 1196
1185 1197 raise error.UnsupportedPartError(**kwargs)
1186 1198
1187 1199 @parthandler('error:pushraced', ('message',))
1188 1200 def handleerrorpushraced(op, inpart):
1189 1201 """Used to transmit push race error over the wire"""
1190 1202 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1191 1203
1192 1204 @parthandler('listkeys', ('namespace',))
1193 1205 def handlelistkeys(op, inpart):
1194 1206 """retrieve pushkey namespace content stored in a bundle2"""
1195 1207 namespace = inpart.params['namespace']
1196 1208 r = pushkey.decodekeys(inpart.read())
1197 1209 op.records.add('listkeys', (namespace, r))
1198 1210
1199 1211 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1200 1212 def handlepushkey(op, inpart):
1201 1213 """process a pushkey request"""
1202 1214 dec = pushkey.decode
1203 1215 namespace = dec(inpart.params['namespace'])
1204 1216 key = dec(inpart.params['key'])
1205 1217 old = dec(inpart.params['old'])
1206 1218 new = dec(inpart.params['new'])
1207 1219 ret = op.repo.pushkey(namespace, key, old, new)
1208 1220 record = {'namespace': namespace,
1209 1221 'key': key,
1210 1222 'old': old,
1211 1223 'new': new}
1212 1224 op.records.add('pushkey', record)
1213 1225 if op.reply is not None:
1214 1226 rpart = op.reply.newpart('reply:pushkey')
1215 1227 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1216 1228 rpart.addparam('return', '%i' % ret, mandatory=False)
1217 1229
1218 1230 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1219 1231 def handlepushkeyreply(op, inpart):
1220 1232 """retrieve the result of a pushkey request"""
1221 1233 ret = int(inpart.params['return'])
1222 1234 partid = int(inpart.params['in-reply-to'])
1223 1235 op.records.add('pushkey', {'return': ret}, partid)
1224 1236
1225 1237 @parthandler('obsmarkers')
1226 1238 def handleobsmarker(op, inpart):
1227 1239 """add a stream of obsmarkers to the repo"""
1228 1240 tr = op.gettransaction()
1229 1241 markerdata = inpart.read()
1230 1242 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1231 1243 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1232 1244 % len(markerdata))
1233 1245 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1234 1246 if new:
1235 1247 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1236 1248 op.records.add('obsmarkers', {'new': new})
1237 1249 if op.reply is not None:
1238 1250 rpart = op.reply.newpart('reply:obsmarkers')
1239 1251 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1240 1252 rpart.addparam('new', '%i' % new, mandatory=False)
1241 1253
1242 1254
1243 1255 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1244 1256 def handlepushkeyreply(op, inpart):
1245 1257 """retrieve the result of a pushkey request"""
1246 1258 ret = int(inpart.params['new'])
1247 1259 partid = int(inpart.params['in-reply-to'])
1248 1260 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now