##// END OF EJS Templates
bundle2: extract processing of part into its own function...
Pierre-Yves David -
r23008:d3137827 default
parent child Browse files
Show More
@@ -1,948 +1,953 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: (16 bits integer)
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: (16 bits inter)
68 68
69 69 The total number of Bytes used by the part headers. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 Bundle processing
129 129 ============================
130 130
131 131 Each part is processed in order using a "part handler". Handler are registered
132 132 for a certain part type.
133 133
134 134 The matching of a part to its handler is case insensitive. The case of the
135 135 part type is used to know if a part is mandatory or advisory. If the Part type
136 136 contains any uppercase char it is considered mandatory. When no handler is
137 137 known for a Mandatory part, the process is aborted and an exception is raised.
138 138 If the part is advisory and no handler is known, the part is ignored. When the
139 139 process is aborted, the full bundle is still read from the stream to keep the
140 140 channel usable. But none of the part read from an abort are processed. In the
141 141 future, dropping the stream may become an option for channel we do not care to
142 142 preserve.
143 143 """
144 144
145 145 import util
146 146 import struct
147 147 import urllib
148 148 import string
149 149 import obsolete
150 150 import pushkey
151 151
152 152 import changegroup, error
153 153 from i18n import _
154 154
155 155 _pack = struct.pack
156 156 _unpack = struct.unpack
157 157
158 158 _magicstring = 'HG2X'
159 159
160 160 _fstreamparamsize = '>H'
161 161 _fpartheadersize = '>H'
162 162 _fparttypesize = '>B'
163 163 _fpartid = '>I'
164 164 _fpayloadsize = '>I'
165 165 _fpartparamcount = '>BB'
166 166
167 167 preferedchunksize = 4096
168 168
169 169 def _makefpartparamsizes(nbparams):
170 170 """return a struct format to read part parameter sizes
171 171
172 172 The number parameters is variable so we need to build that format
173 173 dynamically.
174 174 """
175 175 return '>'+('BB'*nbparams)
176 176
177 177 parthandlermapping = {}
178 178
179 179 def parthandler(parttype, params=()):
180 180 """decorator that register a function as a bundle2 part handler
181 181
182 182 eg::
183 183
184 184 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
185 185 def myparttypehandler(...):
186 186 '''process a part of type "my part".'''
187 187 ...
188 188 """
189 189 def _decorator(func):
190 190 lparttype = parttype.lower() # enforce lower case matching.
191 191 assert lparttype not in parthandlermapping
192 192 parthandlermapping[lparttype] = func
193 193 func.params = frozenset(params)
194 194 return func
195 195 return _decorator
196 196
197 197 class unbundlerecords(object):
198 198 """keep record of what happens during and unbundle
199 199
200 200 New records are added using `records.add('cat', obj)`. Where 'cat' is a
201 201 category of record and obj is an arbitrary object.
202 202
203 203 `records['cat']` will return all entries of this category 'cat'.
204 204
205 205 Iterating on the object itself will yield `('category', obj)` tuples
206 206 for all entries.
207 207
208 208 All iterations happens in chronological order.
209 209 """
210 210
211 211 def __init__(self):
212 212 self._categories = {}
213 213 self._sequences = []
214 214 self._replies = {}
215 215
216 216 def add(self, category, entry, inreplyto=None):
217 217 """add a new record of a given category.
218 218
219 219 The entry can then be retrieved in the list returned by
220 220 self['category']."""
221 221 self._categories.setdefault(category, []).append(entry)
222 222 self._sequences.append((category, entry))
223 223 if inreplyto is not None:
224 224 self.getreplies(inreplyto).add(category, entry)
225 225
226 226 def getreplies(self, partid):
227 227 """get the subrecords that replies to a specific part"""
228 228 return self._replies.setdefault(partid, unbundlerecords())
229 229
230 230 def __getitem__(self, cat):
231 231 return tuple(self._categories.get(cat, ()))
232 232
233 233 def __iter__(self):
234 234 return iter(self._sequences)
235 235
236 236 def __len__(self):
237 237 return len(self._sequences)
238 238
239 239 def __nonzero__(self):
240 240 return bool(self._sequences)
241 241
242 242 class bundleoperation(object):
243 243 """an object that represents a single bundling process
244 244
245 245 Its purpose is to carry unbundle-related objects and states.
246 246
247 247 A new object should be created at the beginning of each bundle processing.
248 248 The object is to be returned by the processing function.
249 249
250 250 The object has very little content now it will ultimately contain:
251 251 * an access to the repo the bundle is applied to,
252 252 * a ui object,
253 253 * a way to retrieve a transaction to add changes to the repo,
254 254 * a way to record the result of processing each part,
255 255 * a way to construct a bundle response when applicable.
256 256 """
257 257
258 258 def __init__(self, repo, transactiongetter):
259 259 self.repo = repo
260 260 self.ui = repo.ui
261 261 self.records = unbundlerecords()
262 262 self.gettransaction = transactiongetter
263 263 self.reply = None
264 264
265 265 class TransactionUnavailable(RuntimeError):
266 266 pass
267 267
268 268 def _notransaction():
269 269 """default method to get a transaction while processing a bundle
270 270
271 271 Raise an exception to highlight the fact that no transaction was expected
272 272 to be created"""
273 273 raise TransactionUnavailable()
274 274
275 275 def processbundle(repo, unbundler, transactiongetter=_notransaction):
276 276 """This function process a bundle, apply effect to/from a repo
277 277
278 278 It iterates over each part then searches for and uses the proper handling
279 279 code to process the part. Parts are processed in order.
280 280
281 281 This is very early version of this function that will be strongly reworked
282 282 before final usage.
283 283
284 284 Unknown Mandatory part will abort the process.
285 285 """
286 286 op = bundleoperation(repo, transactiongetter)
287 287 # todo:
288 288 # - replace this is a init function soon.
289 289 # - exception catching
290 290 unbundler.params
291 291 iterparts = unbundler.iterparts()
292 292 part = None
293 293 try:
294 294 for part in iterparts:
295 parttype = part.type
296 # part key are matched lower case
297 key = parttype.lower()
298 try:
299 handler = parthandlermapping.get(key)
300 if handler is None:
301 raise error.BundleValueError(parttype=key)
302 op.ui.debug('found a handler for part %r\n' % parttype)
303 unknownparams = part.mandatorykeys - handler.params
304 if unknownparams:
305 unknownparams = list(unknownparams)
306 unknownparams.sort()
307 raise error.BundleValueError(parttype=key,
308 params=unknownparams)
309 except error.BundleValueError, exc:
310 if key != parttype: # mandatory parts
311 raise
312 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
313 # consuming the part
314 part.read()
315 continue
316
317
318 # handler is called outside the above try block so that we don't
319 # risk catching KeyErrors from anything other than the
320 # parthandlermapping lookup (any KeyError raised by handler()
321 # itself represents a defect of a different variety).
322 output = None
323 if op.reply is not None:
324 op.ui.pushbuffer(error=True)
325 output = ''
326 try:
327 handler(op, part)
328 finally:
329 if output is not None:
330 output = op.ui.popbuffer()
331 if output:
332 outpart = op.reply.newpart('b2x:output', data=output)
333 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
334 part.read()
295 _processpart(op, part)
335 296 except Exception, exc:
336 if part is not None:
337 # consume the bundle content
338 part.read()
339 297 for part in iterparts:
340 298 # consume the bundle content
341 299 part.read()
342 300 # Small hack to let caller code distinguish exceptions from bundle2
343 301 # processing fron the ones from bundle1 processing. This is mostly
344 302 # needed to handle different return codes to unbundle according to the
345 303 # type of bundle. We should probably clean up or drop this return code
346 304 # craziness in a future version.
347 305 exc.duringunbundle2 = True
348 306 raise
349 307 return op
350 308
309 def _processpart(op, part):
310 """process a single part from a bundle
311
312 The part is guaranteed to have been fully consumed when the function exits
313 (even if an exception is raised)."""
314 try:
315 parttype = part.type
316 # part key are matched lower case
317 key = parttype.lower()
318 try:
319 handler = parthandlermapping.get(key)
320 if handler is None:
321 raise error.BundleValueError(parttype=key)
322 op.ui.debug('found a handler for part %r\n' % parttype)
323 unknownparams = part.mandatorykeys - handler.params
324 if unknownparams:
325 unknownparams = list(unknownparams)
326 unknownparams.sort()
327 raise error.BundleValueError(parttype=key,
328 params=unknownparams)
329 except error.BundleValueError, exc:
330 if key != parttype: # mandatory parts
331 raise
332 op.ui.debug('ignoring unsupported advisory part %s\n' % exc)
333 return # skip to part processing
334
335 # handler is called outside the above try block so that we don't
336 # risk catching KeyErrors from anything other than the
337 # parthandlermapping lookup (any KeyError raised by handler()
338 # itself represents a defect of a different variety).
339 output = None
340 if op.reply is not None:
341 op.ui.pushbuffer(error=True)
342 output = ''
343 try:
344 handler(op, part)
345 finally:
346 if output is not None:
347 output = op.ui.popbuffer()
348 if output:
349 outpart = op.reply.newpart('b2x:output', data=output)
350 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
351 finally:
352 # consume the part content to not corrupt the stream.
353 part.read()
354
355
351 356 def decodecaps(blob):
352 357 """decode a bundle2 caps bytes blob into a dictionnary
353 358
354 359 The blob is a list of capabilities (one per line)
355 360 Capabilities may have values using a line of the form::
356 361
357 362 capability=value1,value2,value3
358 363
359 364 The values are always a list."""
360 365 caps = {}
361 366 for line in blob.splitlines():
362 367 if not line:
363 368 continue
364 369 if '=' not in line:
365 370 key, vals = line, ()
366 371 else:
367 372 key, vals = line.split('=', 1)
368 373 vals = vals.split(',')
369 374 key = urllib.unquote(key)
370 375 vals = [urllib.unquote(v) for v in vals]
371 376 caps[key] = vals
372 377 return caps
373 378
374 379 def encodecaps(caps):
375 380 """encode a bundle2 caps dictionary into a bytes blob"""
376 381 chunks = []
377 382 for ca in sorted(caps):
378 383 vals = caps[ca]
379 384 ca = urllib.quote(ca)
380 385 vals = [urllib.quote(v) for v in vals]
381 386 if vals:
382 387 ca = "%s=%s" % (ca, ','.join(vals))
383 388 chunks.append(ca)
384 389 return '\n'.join(chunks)
385 390
386 391 class bundle20(object):
387 392 """represent an outgoing bundle2 container
388 393
389 394 Use the `addparam` method to add stream level parameter. and `newpart` to
390 395 populate it. Then call `getchunks` to retrieve all the binary chunks of
391 396 data that compose the bundle2 container."""
392 397
393 398 def __init__(self, ui, capabilities=()):
394 399 self.ui = ui
395 400 self._params = []
396 401 self._parts = []
397 402 self.capabilities = dict(capabilities)
398 403
399 404 @property
400 405 def nbparts(self):
401 406 """total number of parts added to the bundler"""
402 407 return len(self._parts)
403 408
404 409 # methods used to defines the bundle2 content
405 410 def addparam(self, name, value=None):
406 411 """add a stream level parameter"""
407 412 if not name:
408 413 raise ValueError('empty parameter name')
409 414 if name[0] not in string.letters:
410 415 raise ValueError('non letter first character: %r' % name)
411 416 self._params.append((name, value))
412 417
413 418 def addpart(self, part):
414 419 """add a new part to the bundle2 container
415 420
416 421 Parts contains the actual applicative payload."""
417 422 assert part.id is None
418 423 part.id = len(self._parts) # very cheap counter
419 424 self._parts.append(part)
420 425
421 426 def newpart(self, typeid, *args, **kwargs):
422 427 """create a new part and add it to the containers
423 428
424 429 As the part is directly added to the containers. For now, this means
425 430 that any failure to properly initialize the part after calling
426 431 ``newpart`` should result in a failure of the whole bundling process.
427 432
428 433 You can still fall back to manually create and add if you need better
429 434 control."""
430 435 part = bundlepart(typeid, *args, **kwargs)
431 436 self.addpart(part)
432 437 return part
433 438
434 439 # methods used to generate the bundle2 stream
435 440 def getchunks(self):
436 441 self.ui.debug('start emission of %s stream\n' % _magicstring)
437 442 yield _magicstring
438 443 param = self._paramchunk()
439 444 self.ui.debug('bundle parameter: %s\n' % param)
440 445 yield _pack(_fstreamparamsize, len(param))
441 446 if param:
442 447 yield param
443 448
444 449 self.ui.debug('start of parts\n')
445 450 for part in self._parts:
446 451 self.ui.debug('bundle part: "%s"\n' % part.type)
447 452 for chunk in part.getchunks():
448 453 yield chunk
449 454 self.ui.debug('end of bundle\n')
450 455 yield _pack(_fpartheadersize, 0)
451 456
452 457 def _paramchunk(self):
453 458 """return a encoded version of all stream parameters"""
454 459 blocks = []
455 460 for par, value in self._params:
456 461 par = urllib.quote(par)
457 462 if value is not None:
458 463 value = urllib.quote(value)
459 464 par = '%s=%s' % (par, value)
460 465 blocks.append(par)
461 466 return ' '.join(blocks)
462 467
463 468 class unpackermixin(object):
464 469 """A mixin to extract bytes and struct data from a stream"""
465 470
466 471 def __init__(self, fp):
467 472 self._fp = fp
468 473
469 474 def _unpack(self, format):
470 475 """unpack this struct format from the stream"""
471 476 data = self._readexact(struct.calcsize(format))
472 477 return _unpack(format, data)
473 478
474 479 def _readexact(self, size):
475 480 """read exactly <size> bytes from the stream"""
476 481 return changegroup.readexactly(self._fp, size)
477 482
478 483
479 484 class unbundle20(unpackermixin):
480 485 """interpret a bundle2 stream
481 486
482 487 This class is fed with a binary stream and yields parts through its
483 488 `iterparts` methods."""
484 489
485 490 def __init__(self, ui, fp, header=None):
486 491 """If header is specified, we do not read it out of the stream."""
487 492 self.ui = ui
488 493 super(unbundle20, self).__init__(fp)
489 494 if header is None:
490 495 header = self._readexact(4)
491 496 magic, version = header[0:2], header[2:4]
492 497 if magic != 'HG':
493 498 raise util.Abort(_('not a Mercurial bundle'))
494 499 if version != '2X':
495 500 raise util.Abort(_('unknown bundle version %s') % version)
496 501 self.ui.debug('start processing of %s stream\n' % header)
497 502
498 503 @util.propertycache
499 504 def params(self):
500 505 """dictionary of stream level parameters"""
501 506 self.ui.debug('reading bundle2 stream parameters\n')
502 507 params = {}
503 508 paramssize = self._unpack(_fstreamparamsize)[0]
504 509 if paramssize:
505 510 for p in self._readexact(paramssize).split(' '):
506 511 p = p.split('=', 1)
507 512 p = [urllib.unquote(i) for i in p]
508 513 if len(p) < 2:
509 514 p.append(None)
510 515 self._processparam(*p)
511 516 params[p[0]] = p[1]
512 517 return params
513 518
514 519 def _processparam(self, name, value):
515 520 """process a parameter, applying its effect if needed
516 521
517 522 Parameter starting with a lower case letter are advisory and will be
518 523 ignored when unknown. Those starting with an upper case letter are
519 524 mandatory and will this function will raise a KeyError when unknown.
520 525
521 526 Note: no option are currently supported. Any input will be either
522 527 ignored or failing.
523 528 """
524 529 if not name:
525 530 raise ValueError('empty parameter name')
526 531 if name[0] not in string.letters:
527 532 raise ValueError('non letter first character: %r' % name)
528 533 # Some logic will be later added here to try to process the option for
529 534 # a dict of known parameter.
530 535 if name[0].islower():
531 536 self.ui.debug("ignoring unknown parameter %r\n" % name)
532 537 else:
533 538 raise error.BundleValueError(params=(name,))
534 539
535 540
536 541 def iterparts(self):
537 542 """yield all parts contained in the stream"""
538 543 # make sure param have been loaded
539 544 self.params
540 545 self.ui.debug('start extraction of bundle2 parts\n')
541 546 headerblock = self._readpartheader()
542 547 while headerblock is not None:
543 548 part = unbundlepart(self.ui, headerblock, self._fp)
544 549 yield part
545 550 headerblock = self._readpartheader()
546 551 self.ui.debug('end of bundle2 stream\n')
547 552
548 553 def _readpartheader(self):
549 554 """reads a part header size and return the bytes blob
550 555
551 556 returns None if empty"""
552 557 headersize = self._unpack(_fpartheadersize)[0]
553 558 self.ui.debug('part header size: %i\n' % headersize)
554 559 if headersize:
555 560 return self._readexact(headersize)
556 561 return None
557 562
558 563
559 564 class bundlepart(object):
560 565 """A bundle2 part contains application level payload
561 566
562 567 The part `type` is used to route the part to the application level
563 568 handler.
564 569
565 570 The part payload is contained in ``part.data``. It could be raw bytes or a
566 571 generator of byte chunks.
567 572
568 573 You can add parameters to the part using the ``addparam`` method.
569 574 Parameters can be either mandatory (default) or advisory. Remote side
570 575 should be able to safely ignore the advisory ones.
571 576
572 577 Both data and parameters cannot be modified after the generation has begun.
573 578 """
574 579
575 580 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
576 581 data=''):
577 582 self.id = None
578 583 self.type = parttype
579 584 self._data = data
580 585 self._mandatoryparams = list(mandatoryparams)
581 586 self._advisoryparams = list(advisoryparams)
582 587 # checking for duplicated entries
583 588 self._seenparams = set()
584 589 for pname, __ in self._mandatoryparams + self._advisoryparams:
585 590 if pname in self._seenparams:
586 591 raise RuntimeError('duplicated params: %s' % pname)
587 592 self._seenparams.add(pname)
588 593 # status of the part's generation:
589 594 # - None: not started,
590 595 # - False: currently generated,
591 596 # - True: generation done.
592 597 self._generated = None
593 598
594 599 # methods used to defines the part content
595 600 def __setdata(self, data):
596 601 if self._generated is not None:
597 602 raise error.ReadOnlyPartError('part is being generated')
598 603 self._data = data
599 604 def __getdata(self):
600 605 return self._data
601 606 data = property(__getdata, __setdata)
602 607
603 608 @property
604 609 def mandatoryparams(self):
605 610 # make it an immutable tuple to force people through ``addparam``
606 611 return tuple(self._mandatoryparams)
607 612
608 613 @property
609 614 def advisoryparams(self):
610 615 # make it an immutable tuple to force people through ``addparam``
611 616 return tuple(self._advisoryparams)
612 617
613 618 def addparam(self, name, value='', mandatory=True):
614 619 if self._generated is not None:
615 620 raise error.ReadOnlyPartError('part is being generated')
616 621 if name in self._seenparams:
617 622 raise ValueError('duplicated params: %s' % name)
618 623 self._seenparams.add(name)
619 624 params = self._advisoryparams
620 625 if mandatory:
621 626 params = self._mandatoryparams
622 627 params.append((name, value))
623 628
624 629 # methods used to generates the bundle2 stream
625 630 def getchunks(self):
626 631 if self._generated is not None:
627 632 raise RuntimeError('part can only be consumed once')
628 633 self._generated = False
629 634 #### header
630 635 ## parttype
631 636 header = [_pack(_fparttypesize, len(self.type)),
632 637 self.type, _pack(_fpartid, self.id),
633 638 ]
634 639 ## parameters
635 640 # count
636 641 manpar = self.mandatoryparams
637 642 advpar = self.advisoryparams
638 643 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
639 644 # size
640 645 parsizes = []
641 646 for key, value in manpar:
642 647 parsizes.append(len(key))
643 648 parsizes.append(len(value))
644 649 for key, value in advpar:
645 650 parsizes.append(len(key))
646 651 parsizes.append(len(value))
647 652 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
648 653 header.append(paramsizes)
649 654 # key, value
650 655 for key, value in manpar:
651 656 header.append(key)
652 657 header.append(value)
653 658 for key, value in advpar:
654 659 header.append(key)
655 660 header.append(value)
656 661 ## finalize header
657 662 headerchunk = ''.join(header)
658 663 yield _pack(_fpartheadersize, len(headerchunk))
659 664 yield headerchunk
660 665 ## payload
661 666 for chunk in self._payloadchunks():
662 667 yield _pack(_fpayloadsize, len(chunk))
663 668 yield chunk
664 669 # end of payload
665 670 yield _pack(_fpayloadsize, 0)
666 671 self._generated = True
667 672
668 673 def _payloadchunks(self):
669 674 """yield chunks of a the part payload
670 675
671 676 Exists to handle the different methods to provide data to a part."""
672 677 # we only support fixed size data now.
673 678 # This will be improved in the future.
674 679 if util.safehasattr(self.data, 'next'):
675 680 buff = util.chunkbuffer(self.data)
676 681 chunk = buff.read(preferedchunksize)
677 682 while chunk:
678 683 yield chunk
679 684 chunk = buff.read(preferedchunksize)
680 685 elif len(self.data):
681 686 yield self.data
682 687
683 688 class unbundlepart(unpackermixin):
684 689 """a bundle part read from a bundle"""
685 690
686 691 def __init__(self, ui, header, fp):
687 692 super(unbundlepart, self).__init__(fp)
688 693 self.ui = ui
689 694 # unbundle state attr
690 695 self._headerdata = header
691 696 self._headeroffset = 0
692 697 self._initialized = False
693 698 self.consumed = False
694 699 # part data
695 700 self.id = None
696 701 self.type = None
697 702 self.mandatoryparams = None
698 703 self.advisoryparams = None
699 704 self.params = None
700 705 self.mandatorykeys = ()
701 706 self._payloadstream = None
702 707 self._readheader()
703 708
704 709 def _fromheader(self, size):
705 710 """return the next <size> byte from the header"""
706 711 offset = self._headeroffset
707 712 data = self._headerdata[offset:(offset + size)]
708 713 self._headeroffset = offset + size
709 714 return data
710 715
711 716 def _unpackheader(self, format):
712 717 """read given format from header
713 718
714 719 This automatically compute the size of the format to read."""
715 720 data = self._fromheader(struct.calcsize(format))
716 721 return _unpack(format, data)
717 722
718 723 def _initparams(self, mandatoryparams, advisoryparams):
719 724 """internal function to setup all logic related parameters"""
720 725 # make it read only to prevent people touching it by mistake.
721 726 self.mandatoryparams = tuple(mandatoryparams)
722 727 self.advisoryparams = tuple(advisoryparams)
723 728 # user friendly UI
724 729 self.params = dict(self.mandatoryparams)
725 730 self.params.update(dict(self.advisoryparams))
726 731 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
727 732
728 733 def _readheader(self):
729 734 """read the header and setup the object"""
730 735 typesize = self._unpackheader(_fparttypesize)[0]
731 736 self.type = self._fromheader(typesize)
732 737 self.ui.debug('part type: "%s"\n' % self.type)
733 738 self.id = self._unpackheader(_fpartid)[0]
734 739 self.ui.debug('part id: "%s"\n' % self.id)
735 740 ## reading parameters
736 741 # param count
737 742 mancount, advcount = self._unpackheader(_fpartparamcount)
738 743 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
739 744 # param size
740 745 fparamsizes = _makefpartparamsizes(mancount + advcount)
741 746 paramsizes = self._unpackheader(fparamsizes)
742 747 # make it a list of couple again
743 748 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
744 749 # split mandatory from advisory
745 750 mansizes = paramsizes[:mancount]
746 751 advsizes = paramsizes[mancount:]
747 752 # retrive param value
748 753 manparams = []
749 754 for key, value in mansizes:
750 755 manparams.append((self._fromheader(key), self._fromheader(value)))
751 756 advparams = []
752 757 for key, value in advsizes:
753 758 advparams.append((self._fromheader(key), self._fromheader(value)))
754 759 self._initparams(manparams, advparams)
755 760 ## part payload
756 761 def payloadchunks():
757 762 payloadsize = self._unpack(_fpayloadsize)[0]
758 763 self.ui.debug('payload chunk size: %i\n' % payloadsize)
759 764 while payloadsize:
760 765 yield self._readexact(payloadsize)
761 766 payloadsize = self._unpack(_fpayloadsize)[0]
762 767 self.ui.debug('payload chunk size: %i\n' % payloadsize)
763 768 self._payloadstream = util.chunkbuffer(payloadchunks())
764 769 # we read the data, tell it
765 770 self._initialized = True
766 771
767 772 def read(self, size=None):
768 773 """read payload data"""
769 774 if not self._initialized:
770 775 self._readheader()
771 776 if size is None:
772 777 data = self._payloadstream.read()
773 778 else:
774 779 data = self._payloadstream.read(size)
775 780 if size is None or len(data) < size:
776 781 self.consumed = True
777 782 return data
778 783
779 784 capabilities = {'HG2X': (),
780 785 'b2x:listkeys': (),
781 786 'b2x:pushkey': (),
782 787 'b2x:changegroup': (),
783 788 }
784 789
785 790 def getrepocaps(repo):
786 791 """return the bundle2 capabilities for a given repo
787 792
788 793 Exists to allow extensions (like evolution) to mutate the capabilities.
789 794 """
790 795 caps = capabilities.copy()
791 796 if obsolete.isenabled(repo, obsolete.exchangeopt):
792 797 supportedformat = tuple('V%i' % v for v in obsolete.formats)
793 798 caps['b2x:obsmarkers'] = supportedformat
794 799 return caps
795 800
796 801 def bundle2caps(remote):
797 802 """return the bundlecapabilities of a peer as dict"""
798 803 raw = remote.capable('bundle2-exp')
799 804 if not raw and raw != '':
800 805 return {}
801 806 capsblob = urllib.unquote(remote.capable('bundle2-exp'))
802 807 return decodecaps(capsblob)
803 808
804 809 def obsmarkersversion(caps):
805 810 """extract the list of supported obsmarkers versions from a bundle2caps dict
806 811 """
807 812 obscaps = caps.get('b2x:obsmarkers', ())
808 813 return [int(c[1:]) for c in obscaps if c.startswith('V')]
809 814
810 815 @parthandler('b2x:changegroup')
811 816 def handlechangegroup(op, inpart):
812 817 """apply a changegroup part on the repo
813 818
814 819 This is a very early implementation that will massive rework before being
815 820 inflicted to any end-user.
816 821 """
817 822 # Make sure we trigger a transaction creation
818 823 #
819 824 # The addchangegroup function will get a transaction object by itself, but
820 825 # we need to make sure we trigger the creation of a transaction object used
821 826 # for the whole processing scope.
822 827 op.gettransaction()
823 828 cg = changegroup.cg1unpacker(inpart, 'UN')
824 829 # the source and url passed here are overwritten by the one contained in
825 830 # the transaction.hookargs argument. So 'bundle2' is a placeholder
826 831 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
827 832 op.records.add('changegroup', {'return': ret})
828 833 if op.reply is not None:
829 834 # This is definitly not the final form of this
830 835 # return. But one need to start somewhere.
831 836 part = op.reply.newpart('b2x:reply:changegroup')
832 837 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
833 838 part.addparam('return', '%i' % ret, mandatory=False)
834 839 assert not inpart.read()
835 840
836 841 @parthandler('b2x:reply:changegroup', ('return', 'in-reply-to'))
837 842 def handlereplychangegroup(op, inpart):
838 843 ret = int(inpart.params['return'])
839 844 replyto = int(inpart.params['in-reply-to'])
840 845 op.records.add('changegroup', {'return': ret}, replyto)
841 846
842 847 @parthandler('b2x:check:heads')
843 848 def handlecheckheads(op, inpart):
844 849 """check that head of the repo did not change
845 850
846 851 This is used to detect a push race when using unbundle.
847 852 This replaces the "heads" argument of unbundle."""
848 853 h = inpart.read(20)
849 854 heads = []
850 855 while len(h) == 20:
851 856 heads.append(h)
852 857 h = inpart.read(20)
853 858 assert not h
854 859 if heads != op.repo.heads():
855 860 raise error.PushRaced('repository changed while pushing - '
856 861 'please try again')
857 862
858 863 @parthandler('b2x:output')
859 864 def handleoutput(op, inpart):
860 865 """forward output captured on the server to the client"""
861 866 for line in inpart.read().splitlines():
862 867 op.ui.write(('remote: %s\n' % line))
863 868
864 869 @parthandler('b2x:replycaps')
865 870 def handlereplycaps(op, inpart):
866 871 """Notify that a reply bundle should be created
867 872
868 873 The payload contains the capabilities information for the reply"""
869 874 caps = decodecaps(inpart.read())
870 875 if op.reply is None:
871 876 op.reply = bundle20(op.ui, caps)
872 877
873 878 @parthandler('b2x:error:abort', ('message', 'hint'))
874 879 def handlereplycaps(op, inpart):
875 880 """Used to transmit abort error over the wire"""
876 881 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
877 882
878 883 @parthandler('b2x:error:unsupportedcontent', ('parttype', 'params'))
879 884 def handlereplycaps(op, inpart):
880 885 """Used to transmit unknown content error over the wire"""
881 886 kwargs = {}
882 887 parttype = inpart.params.get('parttype')
883 888 if parttype is not None:
884 889 kwargs['parttype'] = parttype
885 890 params = inpart.params.get('params')
886 891 if params is not None:
887 892 kwargs['params'] = params.split('\0')
888 893
889 894 raise error.BundleValueError(**kwargs)
890 895
891 896 @parthandler('b2x:error:pushraced', ('message',))
892 897 def handlereplycaps(op, inpart):
893 898 """Used to transmit push race error over the wire"""
894 899 raise error.ResponseError(_('push failed:'), inpart.params['message'])
895 900
896 901 @parthandler('b2x:listkeys', ('namespace',))
897 902 def handlelistkeys(op, inpart):
898 903 """retrieve pushkey namespace content stored in a bundle2"""
899 904 namespace = inpart.params['namespace']
900 905 r = pushkey.decodekeys(inpart.read())
901 906 op.records.add('listkeys', (namespace, r))
902 907
903 908 @parthandler('b2x:pushkey', ('namespace', 'key', 'old', 'new'))
904 909 def handlepushkey(op, inpart):
905 910 """process a pushkey request"""
906 911 dec = pushkey.decode
907 912 namespace = dec(inpart.params['namespace'])
908 913 key = dec(inpart.params['key'])
909 914 old = dec(inpart.params['old'])
910 915 new = dec(inpart.params['new'])
911 916 ret = op.repo.pushkey(namespace, key, old, new)
912 917 record = {'namespace': namespace,
913 918 'key': key,
914 919 'old': old,
915 920 'new': new}
916 921 op.records.add('pushkey', record)
917 922 if op.reply is not None:
918 923 rpart = op.reply.newpart('b2x:reply:pushkey')
919 924 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
920 925 rpart.addparam('return', '%i' % ret, mandatory=False)
921 926
922 927 @parthandler('b2x:reply:pushkey', ('return', 'in-reply-to'))
923 928 def handlepushkeyreply(op, inpart):
924 929 """retrieve the result of a pushkey request"""
925 930 ret = int(inpart.params['return'])
926 931 partid = int(inpart.params['in-reply-to'])
927 932 op.records.add('pushkey', {'return': ret}, partid)
928 933
929 934 @parthandler('b2x:obsmarkers')
930 935 def handleobsmarker(op, inpart):
931 936 """add a stream of obsmarkers to the repo"""
932 937 tr = op.gettransaction()
933 938 new = op.repo.obsstore.mergemarkers(tr, inpart.read())
934 939 if new:
935 940 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
936 941 op.records.add('obsmarkers', {'new': new})
937 942 if op.reply is not None:
938 943 rpart = op.reply.newpart('b2x:reply:obsmarkers')
939 944 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
940 945 rpart.addparam('new', '%i' % new, mandatory=False)
941 946
942 947
943 948 @parthandler('b2x:reply:obsmarkers', ('new', 'in-reply-to'))
944 949 def handlepushkeyreply(op, inpart):
945 950 """retrieve the result of a pushkey request"""
946 951 ret = int(inpart.params['new'])
947 952 partid = int(inpart.params['in-reply-to'])
948 953 op.records.add('obsmarkers', {'new': ret}, partid)
General Comments 0
You need to be logged in to leave comments. Login now