##// END OF EJS Templates
bundle20: extract core payload generation in its own function...
Pierre-Yves David -
r26396:d90c3080 default
parent child Browse files
Show More
@@ -1,1434 +1,1441 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155 import urllib
156 156
157 157 from .i18n import _
158 158 from . import (
159 159 changegroup,
160 160 error,
161 161 obsolete,
162 162 pushkey,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 _pack = struct.pack
169 169 _unpack = struct.unpack
170 170
171 171 _fstreamparamsize = '>i'
172 172 _fpartheadersize = '>i'
173 173 _fparttypesize = '>B'
174 174 _fpartid = '>I'
175 175 _fpayloadsize = '>i'
176 176 _fpartparamcount = '>BB'
177 177
178 178 preferedchunksize = 4096
179 179
180 180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181 181
182 182 def outdebug(ui, message):
183 183 """debug regarding output stream (bundling)"""
184 184 if ui.configbool('devel', 'bundle2.debug', False):
185 185 ui.debug('bundle2-output: %s\n' % message)
186 186
187 187 def indebug(ui, message):
188 188 """debug on input stream (unbundling)"""
189 189 if ui.configbool('devel', 'bundle2.debug', False):
190 190 ui.debug('bundle2-input: %s\n' % message)
191 191
192 192 def validateparttype(parttype):
193 193 """raise ValueError if a parttype contains invalid character"""
194 194 if _parttypeforbidden.search(parttype):
195 195 raise ValueError(parttype)
196 196
197 197 def _makefpartparamsizes(nbparams):
198 198 """return a struct format to read part parameter sizes
199 199
200 200 The number parameters is variable so we need to build that format
201 201 dynamically.
202 202 """
203 203 return '>'+('BB'*nbparams)
204 204
205 205 parthandlermapping = {}
206 206
207 207 def parthandler(parttype, params=()):
208 208 """decorator that register a function as a bundle2 part handler
209 209
210 210 eg::
211 211
212 212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 213 def myparttypehandler(...):
214 214 '''process a part of type "my part".'''
215 215 ...
216 216 """
217 217 validateparttype(parttype)
218 218 def _decorator(func):
219 219 lparttype = parttype.lower() # enforce lower case matching.
220 220 assert lparttype not in parthandlermapping
221 221 parthandlermapping[lparttype] = func
222 222 func.params = frozenset(params)
223 223 return func
224 224 return _decorator
225 225
226 226 class unbundlerecords(object):
227 227 """keep record of what happens during and unbundle
228 228
229 229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 230 category of record and obj is an arbitrary object.
231 231
232 232 `records['cat']` will return all entries of this category 'cat'.
233 233
234 234 Iterating on the object itself will yield `('category', obj)` tuples
235 235 for all entries.
236 236
237 237 All iterations happens in chronological order.
238 238 """
239 239
240 240 def __init__(self):
241 241 self._categories = {}
242 242 self._sequences = []
243 243 self._replies = {}
244 244
245 245 def add(self, category, entry, inreplyto=None):
246 246 """add a new record of a given category.
247 247
248 248 The entry can then be retrieved in the list returned by
249 249 self['category']."""
250 250 self._categories.setdefault(category, []).append(entry)
251 251 self._sequences.append((category, entry))
252 252 if inreplyto is not None:
253 253 self.getreplies(inreplyto).add(category, entry)
254 254
255 255 def getreplies(self, partid):
256 256 """get the records that are replies to a specific part"""
257 257 return self._replies.setdefault(partid, unbundlerecords())
258 258
259 259 def __getitem__(self, cat):
260 260 return tuple(self._categories.get(cat, ()))
261 261
262 262 def __iter__(self):
263 263 return iter(self._sequences)
264 264
265 265 def __len__(self):
266 266 return len(self._sequences)
267 267
268 268 def __nonzero__(self):
269 269 return bool(self._sequences)
270 270
271 271 class bundleoperation(object):
272 272 """an object that represents a single bundling process
273 273
274 274 Its purpose is to carry unbundle-related objects and states.
275 275
276 276 A new object should be created at the beginning of each bundle processing.
277 277 The object is to be returned by the processing function.
278 278
279 279 The object has very little content now it will ultimately contain:
280 280 * an access to the repo the bundle is applied to,
281 281 * a ui object,
282 282 * a way to retrieve a transaction to add changes to the repo,
283 283 * a way to record the result of processing each part,
284 284 * a way to construct a bundle response when applicable.
285 285 """
286 286
287 287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 288 self.repo = repo
289 289 self.ui = repo.ui
290 290 self.records = unbundlerecords()
291 291 self.gettransaction = transactiongetter
292 292 self.reply = None
293 293 self.captureoutput = captureoutput
294 294
295 295 class TransactionUnavailable(RuntimeError):
296 296 pass
297 297
298 298 def _notransaction():
299 299 """default method to get a transaction while processing a bundle
300 300
301 301 Raise an exception to highlight the fact that no transaction was expected
302 302 to be created"""
303 303 raise TransactionUnavailable()
304 304
305 305 def processbundle(repo, unbundler, transactiongetter=None, op=None):
306 306 """This function process a bundle, apply effect to/from a repo
307 307
308 308 It iterates over each part then searches for and uses the proper handling
309 309 code to process the part. Parts are processed in order.
310 310
311 311 This is very early version of this function that will be strongly reworked
312 312 before final usage.
313 313
314 314 Unknown Mandatory part will abort the process.
315 315
316 316 It is temporarily possible to provide a prebuilt bundleoperation to the
317 317 function. This is used to ensure output is properly propagated in case of
318 318 an error during the unbundling. This output capturing part will likely be
319 319 reworked and this ability will probably go away in the process.
320 320 """
321 321 if op is None:
322 322 if transactiongetter is None:
323 323 transactiongetter = _notransaction
324 324 op = bundleoperation(repo, transactiongetter)
325 325 # todo:
326 326 # - replace this is a init function soon.
327 327 # - exception catching
328 328 unbundler.params
329 329 if repo.ui.debugflag:
330 330 msg = ['bundle2-input-bundle:']
331 331 if unbundler.params:
332 332 msg.append(' %i params')
333 333 if op.gettransaction is None:
334 334 msg.append(' no-transaction')
335 335 else:
336 336 msg.append(' with-transaction')
337 337 msg.append('\n')
338 338 repo.ui.debug(''.join(msg))
339 339 iterparts = enumerate(unbundler.iterparts())
340 340 part = None
341 341 nbpart = 0
342 342 try:
343 343 for nbpart, part in iterparts:
344 344 _processpart(op, part)
345 345 except BaseException as exc:
346 346 for nbpart, part in iterparts:
347 347 # consume the bundle content
348 348 part.seek(0, 2)
349 349 # Small hack to let caller code distinguish exceptions from bundle2
350 350 # processing from processing the old format. This is mostly
351 351 # needed to handle different return codes to unbundle according to the
352 352 # type of bundle. We should probably clean up or drop this return code
353 353 # craziness in a future version.
354 354 exc.duringunbundle2 = True
355 355 salvaged = []
356 356 replycaps = None
357 357 if op.reply is not None:
358 358 salvaged = op.reply.salvageoutput()
359 359 replycaps = op.reply.capabilities
360 360 exc._replycaps = replycaps
361 361 exc._bundle2salvagedoutput = salvaged
362 362 raise
363 363 finally:
364 364 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
365 365
366 366 return op
367 367
368 368 def _processpart(op, part):
369 369 """process a single part from a bundle
370 370
371 371 The part is guaranteed to have been fully consumed when the function exits
372 372 (even if an exception is raised)."""
373 373 status = 'unknown' # used by debug output
374 374 try:
375 375 try:
376 376 handler = parthandlermapping.get(part.type)
377 377 if handler is None:
378 378 status = 'unsupported-type'
379 379 raise error.BundleUnknownFeatureError(parttype=part.type)
380 380 indebug(op.ui, 'found a handler for part %r' % part.type)
381 381 unknownparams = part.mandatorykeys - handler.params
382 382 if unknownparams:
383 383 unknownparams = list(unknownparams)
384 384 unknownparams.sort()
385 385 status = 'unsupported-params (%s)' % unknownparams
386 386 raise error.BundleUnknownFeatureError(parttype=part.type,
387 387 params=unknownparams)
388 388 status = 'supported'
389 389 except error.BundleUnknownFeatureError as exc:
390 390 if part.mandatory: # mandatory parts
391 391 raise
392 392 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
393 393 return # skip to part processing
394 394 finally:
395 395 if op.ui.debugflag:
396 396 msg = ['bundle2-input-part: "%s"' % part.type]
397 397 if not part.mandatory:
398 398 msg.append(' (advisory)')
399 399 nbmp = len(part.mandatorykeys)
400 400 nbap = len(part.params) - nbmp
401 401 if nbmp or nbap:
402 402 msg.append(' (params:')
403 403 if nbmp:
404 404 msg.append(' %i mandatory' % nbmp)
405 405 if nbap:
406 406 msg.append(' %i advisory' % nbmp)
407 407 msg.append(')')
408 408 msg.append(' %s\n' % status)
409 409 op.ui.debug(''.join(msg))
410 410
411 411 # handler is called outside the above try block so that we don't
412 412 # risk catching KeyErrors from anything other than the
413 413 # parthandlermapping lookup (any KeyError raised by handler()
414 414 # itself represents a defect of a different variety).
415 415 output = None
416 416 if op.captureoutput and op.reply is not None:
417 417 op.ui.pushbuffer(error=True, subproc=True)
418 418 output = ''
419 419 try:
420 420 handler(op, part)
421 421 finally:
422 422 if output is not None:
423 423 output = op.ui.popbuffer()
424 424 if output:
425 425 outpart = op.reply.newpart('output', data=output,
426 426 mandatory=False)
427 427 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
428 428 finally:
429 429 # consume the part content to not corrupt the stream.
430 430 part.seek(0, 2)
431 431
432 432
433 433 def decodecaps(blob):
434 434 """decode a bundle2 caps bytes blob into a dictionary
435 435
436 436 The blob is a list of capabilities (one per line)
437 437 Capabilities may have values using a line of the form::
438 438
439 439 capability=value1,value2,value3
440 440
441 441 The values are always a list."""
442 442 caps = {}
443 443 for line in blob.splitlines():
444 444 if not line:
445 445 continue
446 446 if '=' not in line:
447 447 key, vals = line, ()
448 448 else:
449 449 key, vals = line.split('=', 1)
450 450 vals = vals.split(',')
451 451 key = urllib.unquote(key)
452 452 vals = [urllib.unquote(v) for v in vals]
453 453 caps[key] = vals
454 454 return caps
455 455
456 456 def encodecaps(caps):
457 457 """encode a bundle2 caps dictionary into a bytes blob"""
458 458 chunks = []
459 459 for ca in sorted(caps):
460 460 vals = caps[ca]
461 461 ca = urllib.quote(ca)
462 462 vals = [urllib.quote(v) for v in vals]
463 463 if vals:
464 464 ca = "%s=%s" % (ca, ','.join(vals))
465 465 chunks.append(ca)
466 466 return '\n'.join(chunks)
467 467
468 468 class bundle20(object):
469 469 """represent an outgoing bundle2 container
470 470
471 471 Use the `addparam` method to add stream level parameter. and `newpart` to
472 472 populate it. Then call `getchunks` to retrieve all the binary chunks of
473 473 data that compose the bundle2 container."""
474 474
475 475 _magicstring = 'HG20'
476 476
477 477 def __init__(self, ui, capabilities=()):
478 478 self.ui = ui
479 479 self._params = []
480 480 self._parts = []
481 481 self.capabilities = dict(capabilities)
482 482
483 483 @property
484 484 def nbparts(self):
485 485 """total number of parts added to the bundler"""
486 486 return len(self._parts)
487 487
488 488 # methods used to defines the bundle2 content
489 489 def addparam(self, name, value=None):
490 490 """add a stream level parameter"""
491 491 if not name:
492 492 raise ValueError('empty parameter name')
493 493 if name[0] not in string.letters:
494 494 raise ValueError('non letter first character: %r' % name)
495 495 self._params.append((name, value))
496 496
497 497 def addpart(self, part):
498 498 """add a new part to the bundle2 container
499 499
500 500 Parts contains the actual applicative payload."""
501 501 assert part.id is None
502 502 part.id = len(self._parts) # very cheap counter
503 503 self._parts.append(part)
504 504
505 505 def newpart(self, typeid, *args, **kwargs):
506 506 """create a new part and add it to the containers
507 507
508 508 As the part is directly added to the containers. For now, this means
509 509 that any failure to properly initialize the part after calling
510 510 ``newpart`` should result in a failure of the whole bundling process.
511 511
512 512 You can still fall back to manually create and add if you need better
513 513 control."""
514 514 part = bundlepart(typeid, *args, **kwargs)
515 515 self.addpart(part)
516 516 return part
517 517
518 518 # methods used to generate the bundle2 stream
519 519 def getchunks(self):
520 520 if self.ui.debugflag:
521 521 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
522 522 if self._params:
523 523 msg.append(' (%i params)' % len(self._params))
524 524 msg.append(' %i parts total\n' % len(self._parts))
525 525 self.ui.debug(''.join(msg))
526 526 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
527 527 yield self._magicstring
528 528 param = self._paramchunk()
529 529 outdebug(self.ui, 'bundle parameter: %s' % param)
530 530 yield _pack(_fstreamparamsize, len(param))
531 531 if param:
532 532 yield param
533
534 outdebug(self.ui, 'start of parts')
535 for part in self._parts:
536 outdebug(self.ui, 'bundle part: "%s"' % part.type)
537 for chunk in part.getchunks(ui=self.ui):
538 yield chunk
539 outdebug(self.ui, 'end of bundle')
540 yield _pack(_fpartheadersize, 0)
533 for chunk in self._getcorechunk():
534 yield chunk
541 535
542 536 def _paramchunk(self):
543 537 """return a encoded version of all stream parameters"""
544 538 blocks = []
545 539 for par, value in self._params:
546 540 par = urllib.quote(par)
547 541 if value is not None:
548 542 value = urllib.quote(value)
549 543 par = '%s=%s' % (par, value)
550 544 blocks.append(par)
551 545 return ' '.join(blocks)
552 546
547 def _getcorechunk(self):
548 """yield chunk for the core part of the bundle
549
550 (all but headers and parameters)"""
551 outdebug(self.ui, 'start of parts')
552 for part in self._parts:
553 outdebug(self.ui, 'bundle part: "%s"' % part.type)
554 for chunk in part.getchunks(ui=self.ui):
555 yield chunk
556 outdebug(self.ui, 'end of bundle')
557 yield _pack(_fpartheadersize, 0)
558
559
553 560 def salvageoutput(self):
554 561 """return a list with a copy of all output parts in the bundle
555 562
556 563 This is meant to be used during error handling to make sure we preserve
557 564 server output"""
558 565 salvaged = []
559 566 for part in self._parts:
560 567 if part.type.startswith('output'):
561 568 salvaged.append(part.copy())
562 569 return salvaged
563 570
564 571
565 572 class unpackermixin(object):
566 573 """A mixin to extract bytes and struct data from a stream"""
567 574
568 575 def __init__(self, fp):
569 576 self._fp = fp
570 577 self._seekable = (util.safehasattr(fp, 'seek') and
571 578 util.safehasattr(fp, 'tell'))
572 579
573 580 def _unpack(self, format):
574 581 """unpack this struct format from the stream"""
575 582 data = self._readexact(struct.calcsize(format))
576 583 return _unpack(format, data)
577 584
578 585 def _readexact(self, size):
579 586 """read exactly <size> bytes from the stream"""
580 587 return changegroup.readexactly(self._fp, size)
581 588
582 589 def seek(self, offset, whence=0):
583 590 """move the underlying file pointer"""
584 591 if self._seekable:
585 592 return self._fp.seek(offset, whence)
586 593 else:
587 594 raise NotImplementedError(_('File pointer is not seekable'))
588 595
589 596 def tell(self):
590 597 """return the file offset, or None if file is not seekable"""
591 598 if self._seekable:
592 599 try:
593 600 return self._fp.tell()
594 601 except IOError as e:
595 602 if e.errno == errno.ESPIPE:
596 603 self._seekable = False
597 604 else:
598 605 raise
599 606 return None
600 607
601 608 def close(self):
602 609 """close underlying file"""
603 610 if util.safehasattr(self._fp, 'close'):
604 611 return self._fp.close()
605 612
606 613 def getunbundler(ui, fp, magicstring=None):
607 614 """return a valid unbundler object for a given magicstring"""
608 615 if magicstring is None:
609 616 magicstring = changegroup.readexactly(fp, 4)
610 617 magic, version = magicstring[0:2], magicstring[2:4]
611 618 if magic != 'HG':
612 619 raise util.Abort(_('not a Mercurial bundle'))
613 620 unbundlerclass = formatmap.get(version)
614 621 if unbundlerclass is None:
615 622 raise util.Abort(_('unknown bundle version %s') % version)
616 623 unbundler = unbundlerclass(ui, fp)
617 624 indebug(ui, 'start processing of %s stream' % magicstring)
618 625 return unbundler
619 626
620 627 class unbundle20(unpackermixin):
621 628 """interpret a bundle2 stream
622 629
623 630 This class is fed with a binary stream and yields parts through its
624 631 `iterparts` methods."""
625 632
626 633 def __init__(self, ui, fp):
627 634 """If header is specified, we do not read it out of the stream."""
628 635 self.ui = ui
629 636 super(unbundle20, self).__init__(fp)
630 637
631 638 @util.propertycache
632 639 def params(self):
633 640 """dictionary of stream level parameters"""
634 641 indebug(self.ui, 'reading bundle2 stream parameters')
635 642 params = {}
636 643 paramssize = self._unpack(_fstreamparamsize)[0]
637 644 if paramssize < 0:
638 645 raise error.BundleValueError('negative bundle param size: %i'
639 646 % paramssize)
640 647 if paramssize:
641 648 for p in self._readexact(paramssize).split(' '):
642 649 p = p.split('=', 1)
643 650 p = [urllib.unquote(i) for i in p]
644 651 if len(p) < 2:
645 652 p.append(None)
646 653 self._processparam(*p)
647 654 params[p[0]] = p[1]
648 655 return params
649 656
650 657 def _processparam(self, name, value):
651 658 """process a parameter, applying its effect if needed
652 659
653 660 Parameter starting with a lower case letter are advisory and will be
654 661 ignored when unknown. Those starting with an upper case letter are
655 662 mandatory and will this function will raise a KeyError when unknown.
656 663
657 664 Note: no option are currently supported. Any input will be either
658 665 ignored or failing.
659 666 """
660 667 if not name:
661 668 raise ValueError('empty parameter name')
662 669 if name[0] not in string.letters:
663 670 raise ValueError('non letter first character: %r' % name)
664 671 try:
665 672 handler = b2streamparamsmap[name.lower()]
666 673 except KeyError:
667 674 if name[0].islower():
668 675 indebug(self.ui, "ignoring unknown parameter %r" % name)
669 676 else:
670 677 raise error.BundleUnknownFeatureError(params=(name,))
671 678 else:
672 679 handler(self, name, value)
673 680
674 681 def iterparts(self):
675 682 """yield all parts contained in the stream"""
676 683 # make sure param have been loaded
677 684 self.params
678 685 indebug(self.ui, 'start extraction of bundle2 parts')
679 686 headerblock = self._readpartheader()
680 687 while headerblock is not None:
681 688 part = unbundlepart(self.ui, headerblock, self._fp)
682 689 yield part
683 690 part.seek(0, 2)
684 691 headerblock = self._readpartheader()
685 692 indebug(self.ui, 'end of bundle2 stream')
686 693
687 694 def _readpartheader(self):
688 695 """reads a part header size and return the bytes blob
689 696
690 697 returns None if empty"""
691 698 headersize = self._unpack(_fpartheadersize)[0]
692 699 if headersize < 0:
693 700 raise error.BundleValueError('negative part header size: %i'
694 701 % headersize)
695 702 indebug(self.ui, 'part header size: %i' % headersize)
696 703 if headersize:
697 704 return self._readexact(headersize)
698 705 return None
699 706
700 707 def compressed(self):
701 708 return False
702 709
703 710 formatmap = {'20': unbundle20}
704 711
705 712 b2streamparamsmap = {}
706 713
707 714 def b2streamparamhandler(name):
708 715 """register a handler for a stream level parameter"""
709 716 def decorator(func):
710 717 assert name not in formatmap
711 718 b2streamparamsmap[name] = func
712 719 return func
713 720 return decorator
714 721
715 722 class bundlepart(object):
716 723 """A bundle2 part contains application level payload
717 724
718 725 The part `type` is used to route the part to the application level
719 726 handler.
720 727
721 728 The part payload is contained in ``part.data``. It could be raw bytes or a
722 729 generator of byte chunks.
723 730
724 731 You can add parameters to the part using the ``addparam`` method.
725 732 Parameters can be either mandatory (default) or advisory. Remote side
726 733 should be able to safely ignore the advisory ones.
727 734
728 735 Both data and parameters cannot be modified after the generation has begun.
729 736 """
730 737
731 738 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
732 739 data='', mandatory=True):
733 740 validateparttype(parttype)
734 741 self.id = None
735 742 self.type = parttype
736 743 self._data = data
737 744 self._mandatoryparams = list(mandatoryparams)
738 745 self._advisoryparams = list(advisoryparams)
739 746 # checking for duplicated entries
740 747 self._seenparams = set()
741 748 for pname, __ in self._mandatoryparams + self._advisoryparams:
742 749 if pname in self._seenparams:
743 750 raise RuntimeError('duplicated params: %s' % pname)
744 751 self._seenparams.add(pname)
745 752 # status of the part's generation:
746 753 # - None: not started,
747 754 # - False: currently generated,
748 755 # - True: generation done.
749 756 self._generated = None
750 757 self.mandatory = mandatory
751 758
752 759 def copy(self):
753 760 """return a copy of the part
754 761
755 762 The new part have the very same content but no partid assigned yet.
756 763 Parts with generated data cannot be copied."""
757 764 assert not util.safehasattr(self.data, 'next')
758 765 return self.__class__(self.type, self._mandatoryparams,
759 766 self._advisoryparams, self._data, self.mandatory)
760 767
761 768 # methods used to defines the part content
762 769 def __setdata(self, data):
763 770 if self._generated is not None:
764 771 raise error.ReadOnlyPartError('part is being generated')
765 772 self._data = data
766 773 def __getdata(self):
767 774 return self._data
768 775 data = property(__getdata, __setdata)
769 776
770 777 @property
771 778 def mandatoryparams(self):
772 779 # make it an immutable tuple to force people through ``addparam``
773 780 return tuple(self._mandatoryparams)
774 781
775 782 @property
776 783 def advisoryparams(self):
777 784 # make it an immutable tuple to force people through ``addparam``
778 785 return tuple(self._advisoryparams)
779 786
780 787 def addparam(self, name, value='', mandatory=True):
781 788 if self._generated is not None:
782 789 raise error.ReadOnlyPartError('part is being generated')
783 790 if name in self._seenparams:
784 791 raise ValueError('duplicated params: %s' % name)
785 792 self._seenparams.add(name)
786 793 params = self._advisoryparams
787 794 if mandatory:
788 795 params = self._mandatoryparams
789 796 params.append((name, value))
790 797
791 798 # methods used to generates the bundle2 stream
792 799 def getchunks(self, ui):
793 800 if self._generated is not None:
794 801 raise RuntimeError('part can only be consumed once')
795 802 self._generated = False
796 803
797 804 if ui.debugflag:
798 805 msg = ['bundle2-output-part: "%s"' % self.type]
799 806 if not self.mandatory:
800 807 msg.append(' (advisory)')
801 808 nbmp = len(self.mandatoryparams)
802 809 nbap = len(self.advisoryparams)
803 810 if nbmp or nbap:
804 811 msg.append(' (params:')
805 812 if nbmp:
806 813 msg.append(' %i mandatory' % nbmp)
807 814 if nbap:
808 815 msg.append(' %i advisory' % nbmp)
809 816 msg.append(')')
810 817 if not self.data:
811 818 msg.append(' empty payload')
812 819 elif util.safehasattr(self.data, 'next'):
813 820 msg.append(' streamed payload')
814 821 else:
815 822 msg.append(' %i bytes payload' % len(self.data))
816 823 msg.append('\n')
817 824 ui.debug(''.join(msg))
818 825
819 826 #### header
820 827 if self.mandatory:
821 828 parttype = self.type.upper()
822 829 else:
823 830 parttype = self.type.lower()
824 831 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
825 832 ## parttype
826 833 header = [_pack(_fparttypesize, len(parttype)),
827 834 parttype, _pack(_fpartid, self.id),
828 835 ]
829 836 ## parameters
830 837 # count
831 838 manpar = self.mandatoryparams
832 839 advpar = self.advisoryparams
833 840 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
834 841 # size
835 842 parsizes = []
836 843 for key, value in manpar:
837 844 parsizes.append(len(key))
838 845 parsizes.append(len(value))
839 846 for key, value in advpar:
840 847 parsizes.append(len(key))
841 848 parsizes.append(len(value))
842 849 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
843 850 header.append(paramsizes)
844 851 # key, value
845 852 for key, value in manpar:
846 853 header.append(key)
847 854 header.append(value)
848 855 for key, value in advpar:
849 856 header.append(key)
850 857 header.append(value)
851 858 ## finalize header
852 859 headerchunk = ''.join(header)
853 860 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
854 861 yield _pack(_fpartheadersize, len(headerchunk))
855 862 yield headerchunk
856 863 ## payload
857 864 try:
858 865 for chunk in self._payloadchunks():
859 866 outdebug(ui, 'payload chunk size: %i' % len(chunk))
860 867 yield _pack(_fpayloadsize, len(chunk))
861 868 yield chunk
862 869 except GeneratorExit:
863 870 # GeneratorExit means that nobody is listening for our
864 871 # results anyway, so just bail quickly rather than trying
865 872 # to produce an error part.
866 873 ui.debug('bundle2-generatorexit\n')
867 874 raise
868 875 except BaseException as exc:
869 876 # backup exception data for later
870 877 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
871 878 % exc)
872 879 exc_info = sys.exc_info()
873 880 msg = 'unexpected error: %s' % exc
874 881 interpart = bundlepart('error:abort', [('message', msg)],
875 882 mandatory=False)
876 883 interpart.id = 0
877 884 yield _pack(_fpayloadsize, -1)
878 885 for chunk in interpart.getchunks(ui=ui):
879 886 yield chunk
880 887 outdebug(ui, 'closing payload chunk')
881 888 # abort current part payload
882 889 yield _pack(_fpayloadsize, 0)
883 890 raise exc_info[0], exc_info[1], exc_info[2]
884 891 # end of payload
885 892 outdebug(ui, 'closing payload chunk')
886 893 yield _pack(_fpayloadsize, 0)
887 894 self._generated = True
888 895
889 896 def _payloadchunks(self):
890 897 """yield chunks of a the part payload
891 898
892 899 Exists to handle the different methods to provide data to a part."""
893 900 # we only support fixed size data now.
894 901 # This will be improved in the future.
895 902 if util.safehasattr(self.data, 'next'):
896 903 buff = util.chunkbuffer(self.data)
897 904 chunk = buff.read(preferedchunksize)
898 905 while chunk:
899 906 yield chunk
900 907 chunk = buff.read(preferedchunksize)
901 908 elif len(self.data):
902 909 yield self.data
903 910
904 911
905 912 flaginterrupt = -1
906 913
907 914 class interrupthandler(unpackermixin):
908 915 """read one part and process it with restricted capability
909 916
910 917 This allows to transmit exception raised on the producer size during part
911 918 iteration while the consumer is reading a part.
912 919
913 920 Part processed in this manner only have access to a ui object,"""
914 921
915 922 def __init__(self, ui, fp):
916 923 super(interrupthandler, self).__init__(fp)
917 924 self.ui = ui
918 925
919 926 def _readpartheader(self):
920 927 """reads a part header size and return the bytes blob
921 928
922 929 returns None if empty"""
923 930 headersize = self._unpack(_fpartheadersize)[0]
924 931 if headersize < 0:
925 932 raise error.BundleValueError('negative part header size: %i'
926 933 % headersize)
927 934 indebug(self.ui, 'part header size: %i\n' % headersize)
928 935 if headersize:
929 936 return self._readexact(headersize)
930 937 return None
931 938
932 939 def __call__(self):
933 940
934 941 self.ui.debug('bundle2-input-stream-interrupt:'
935 942 ' opening out of band context\n')
936 943 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
937 944 headerblock = self._readpartheader()
938 945 if headerblock is None:
939 946 indebug(self.ui, 'no part found during interruption.')
940 947 return
941 948 part = unbundlepart(self.ui, headerblock, self._fp)
942 949 op = interruptoperation(self.ui)
943 950 _processpart(op, part)
944 951 self.ui.debug('bundle2-input-stream-interrupt:'
945 952 ' closing out of band context\n')
946 953
947 954 class interruptoperation(object):
948 955 """A limited operation to be use by part handler during interruption
949 956
950 957 It only have access to an ui object.
951 958 """
952 959
953 960 def __init__(self, ui):
954 961 self.ui = ui
955 962 self.reply = None
956 963 self.captureoutput = False
957 964
958 965 @property
959 966 def repo(self):
960 967 raise RuntimeError('no repo access from stream interruption')
961 968
962 969 def gettransaction(self):
963 970 raise TransactionUnavailable('no repo access from stream interruption')
964 971
965 972 class unbundlepart(unpackermixin):
966 973 """a bundle part read from a bundle"""
967 974
968 975 def __init__(self, ui, header, fp):
969 976 super(unbundlepart, self).__init__(fp)
970 977 self.ui = ui
971 978 # unbundle state attr
972 979 self._headerdata = header
973 980 self._headeroffset = 0
974 981 self._initialized = False
975 982 self.consumed = False
976 983 # part data
977 984 self.id = None
978 985 self.type = None
979 986 self.mandatoryparams = None
980 987 self.advisoryparams = None
981 988 self.params = None
982 989 self.mandatorykeys = ()
983 990 self._payloadstream = None
984 991 self._readheader()
985 992 self._mandatory = None
986 993 self._chunkindex = [] #(payload, file) position tuples for chunk starts
987 994 self._pos = 0
988 995
989 996 def _fromheader(self, size):
990 997 """return the next <size> byte from the header"""
991 998 offset = self._headeroffset
992 999 data = self._headerdata[offset:(offset + size)]
993 1000 self._headeroffset = offset + size
994 1001 return data
995 1002
996 1003 def _unpackheader(self, format):
997 1004 """read given format from header
998 1005
999 1006 This automatically compute the size of the format to read."""
1000 1007 data = self._fromheader(struct.calcsize(format))
1001 1008 return _unpack(format, data)
1002 1009
1003 1010 def _initparams(self, mandatoryparams, advisoryparams):
1004 1011 """internal function to setup all logic related parameters"""
1005 1012 # make it read only to prevent people touching it by mistake.
1006 1013 self.mandatoryparams = tuple(mandatoryparams)
1007 1014 self.advisoryparams = tuple(advisoryparams)
1008 1015 # user friendly UI
1009 1016 self.params = dict(self.mandatoryparams)
1010 1017 self.params.update(dict(self.advisoryparams))
1011 1018 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1012 1019
1013 1020 def _payloadchunks(self, chunknum=0):
1014 1021 '''seek to specified chunk and start yielding data'''
1015 1022 if len(self._chunkindex) == 0:
1016 1023 assert chunknum == 0, 'Must start with chunk 0'
1017 1024 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1018 1025 else:
1019 1026 assert chunknum < len(self._chunkindex), \
1020 1027 'Unknown chunk %d' % chunknum
1021 1028 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1022 1029
1023 1030 pos = self._chunkindex[chunknum][0]
1024 1031 payloadsize = self._unpack(_fpayloadsize)[0]
1025 1032 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1026 1033 while payloadsize:
1027 1034 if payloadsize == flaginterrupt:
1028 1035 # interruption detection, the handler will now read a
1029 1036 # single part and process it.
1030 1037 interrupthandler(self.ui, self._fp)()
1031 1038 elif payloadsize < 0:
1032 1039 msg = 'negative payload chunk size: %i' % payloadsize
1033 1040 raise error.BundleValueError(msg)
1034 1041 else:
1035 1042 result = self._readexact(payloadsize)
1036 1043 chunknum += 1
1037 1044 pos += payloadsize
1038 1045 if chunknum == len(self._chunkindex):
1039 1046 self._chunkindex.append((pos,
1040 1047 super(unbundlepart, self).tell()))
1041 1048 yield result
1042 1049 payloadsize = self._unpack(_fpayloadsize)[0]
1043 1050 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1044 1051
1045 1052 def _findchunk(self, pos):
1046 1053 '''for a given payload position, return a chunk number and offset'''
1047 1054 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1048 1055 if ppos == pos:
1049 1056 return chunk, 0
1050 1057 elif ppos > pos:
1051 1058 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1052 1059 raise ValueError('Unknown chunk')
1053 1060
1054 1061 def _readheader(self):
1055 1062 """read the header and setup the object"""
1056 1063 typesize = self._unpackheader(_fparttypesize)[0]
1057 1064 self.type = self._fromheader(typesize)
1058 1065 indebug(self.ui, 'part type: "%s"' % self.type)
1059 1066 self.id = self._unpackheader(_fpartid)[0]
1060 1067 indebug(self.ui, 'part id: "%s"' % self.id)
1061 1068 # extract mandatory bit from type
1062 1069 self.mandatory = (self.type != self.type.lower())
1063 1070 self.type = self.type.lower()
1064 1071 ## reading parameters
1065 1072 # param count
1066 1073 mancount, advcount = self._unpackheader(_fpartparamcount)
1067 1074 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1068 1075 # param size
1069 1076 fparamsizes = _makefpartparamsizes(mancount + advcount)
1070 1077 paramsizes = self._unpackheader(fparamsizes)
1071 1078 # make it a list of couple again
1072 1079 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1073 1080 # split mandatory from advisory
1074 1081 mansizes = paramsizes[:mancount]
1075 1082 advsizes = paramsizes[mancount:]
1076 1083 # retrieve param value
1077 1084 manparams = []
1078 1085 for key, value in mansizes:
1079 1086 manparams.append((self._fromheader(key), self._fromheader(value)))
1080 1087 advparams = []
1081 1088 for key, value in advsizes:
1082 1089 advparams.append((self._fromheader(key), self._fromheader(value)))
1083 1090 self._initparams(manparams, advparams)
1084 1091 ## part payload
1085 1092 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1086 1093 # we read the data, tell it
1087 1094 self._initialized = True
1088 1095
1089 1096 def read(self, size=None):
1090 1097 """read payload data"""
1091 1098 if not self._initialized:
1092 1099 self._readheader()
1093 1100 if size is None:
1094 1101 data = self._payloadstream.read()
1095 1102 else:
1096 1103 data = self._payloadstream.read(size)
1097 1104 self._pos += len(data)
1098 1105 if size is None or len(data) < size:
1099 1106 if not self.consumed and self._pos:
1100 1107 self.ui.debug('bundle2-input-part: total payload size %i\n'
1101 1108 % self._pos)
1102 1109 self.consumed = True
1103 1110 return data
1104 1111
1105 1112 def tell(self):
1106 1113 return self._pos
1107 1114
1108 1115 def seek(self, offset, whence=0):
1109 1116 if whence == 0:
1110 1117 newpos = offset
1111 1118 elif whence == 1:
1112 1119 newpos = self._pos + offset
1113 1120 elif whence == 2:
1114 1121 if not self.consumed:
1115 1122 self.read()
1116 1123 newpos = self._chunkindex[-1][0] - offset
1117 1124 else:
1118 1125 raise ValueError('Unknown whence value: %r' % (whence,))
1119 1126
1120 1127 if newpos > self._chunkindex[-1][0] and not self.consumed:
1121 1128 self.read()
1122 1129 if not 0 <= newpos <= self._chunkindex[-1][0]:
1123 1130 raise ValueError('Offset out of range')
1124 1131
1125 1132 if self._pos != newpos:
1126 1133 chunk, internaloffset = self._findchunk(newpos)
1127 1134 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1128 1135 adjust = self.read(internaloffset)
1129 1136 if len(adjust) != internaloffset:
1130 1137 raise util.Abort(_('Seek failed\n'))
1131 1138 self._pos = newpos
1132 1139
1133 1140 # These are only the static capabilities.
1134 1141 # Check the 'getrepocaps' function for the rest.
1135 1142 capabilities = {'HG20': (),
1136 1143 'error': ('abort', 'unsupportedcontent', 'pushraced',
1137 1144 'pushkey'),
1138 1145 'listkeys': (),
1139 1146 'pushkey': (),
1140 1147 'digests': tuple(sorted(util.DIGESTS.keys())),
1141 1148 'remote-changegroup': ('http', 'https'),
1142 1149 'hgtagsfnodes': (),
1143 1150 }
1144 1151
1145 1152 def getrepocaps(repo, allowpushback=False):
1146 1153 """return the bundle2 capabilities for a given repo
1147 1154
1148 1155 Exists to allow extensions (like evolution) to mutate the capabilities.
1149 1156 """
1150 1157 caps = capabilities.copy()
1151 1158 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1152 1159 if obsolete.isenabled(repo, obsolete.exchangeopt):
1153 1160 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1154 1161 caps['obsmarkers'] = supportedformat
1155 1162 if allowpushback:
1156 1163 caps['pushback'] = ()
1157 1164 return caps
1158 1165
1159 1166 def bundle2caps(remote):
1160 1167 """return the bundle capabilities of a peer as dict"""
1161 1168 raw = remote.capable('bundle2')
1162 1169 if not raw and raw != '':
1163 1170 return {}
1164 1171 capsblob = urllib.unquote(remote.capable('bundle2'))
1165 1172 return decodecaps(capsblob)
1166 1173
1167 1174 def obsmarkersversion(caps):
1168 1175 """extract the list of supported obsmarkers versions from a bundle2caps dict
1169 1176 """
1170 1177 obscaps = caps.get('obsmarkers', ())
1171 1178 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1172 1179
1173 1180 @parthandler('changegroup', ('version', 'nbchanges'))
1174 1181 def handlechangegroup(op, inpart):
1175 1182 """apply a changegroup part on the repo
1176 1183
1177 1184 This is a very early implementation that will massive rework before being
1178 1185 inflicted to any end-user.
1179 1186 """
1180 1187 # Make sure we trigger a transaction creation
1181 1188 #
1182 1189 # The addchangegroup function will get a transaction object by itself, but
1183 1190 # we need to make sure we trigger the creation of a transaction object used
1184 1191 # for the whole processing scope.
1185 1192 op.gettransaction()
1186 1193 unpackerversion = inpart.params.get('version', '01')
1187 1194 # We should raise an appropriate exception here
1188 1195 unpacker = changegroup.packermap[unpackerversion][1]
1189 1196 cg = unpacker(inpart, None)
1190 1197 # the source and url passed here are overwritten by the one contained in
1191 1198 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1192 1199 nbchangesets = None
1193 1200 if 'nbchanges' in inpart.params:
1194 1201 nbchangesets = int(inpart.params.get('nbchanges'))
1195 1202 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2',
1196 1203 expectedtotal=nbchangesets)
1197 1204 op.records.add('changegroup', {'return': ret})
1198 1205 if op.reply is not None:
1199 1206 # This is definitely not the final form of this
1200 1207 # return. But one need to start somewhere.
1201 1208 part = op.reply.newpart('reply:changegroup', mandatory=False)
1202 1209 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1203 1210 part.addparam('return', '%i' % ret, mandatory=False)
1204 1211 assert not inpart.read()
1205 1212
1206 1213 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1207 1214 ['digest:%s' % k for k in util.DIGESTS.keys()])
1208 1215 @parthandler('remote-changegroup', _remotechangegroupparams)
1209 1216 def handleremotechangegroup(op, inpart):
1210 1217 """apply a bundle10 on the repo, given an url and validation information
1211 1218
1212 1219 All the information about the remote bundle to import are given as
1213 1220 parameters. The parameters include:
1214 1221 - url: the url to the bundle10.
1215 1222 - size: the bundle10 file size. It is used to validate what was
1216 1223 retrieved by the client matches the server knowledge about the bundle.
1217 1224 - digests: a space separated list of the digest types provided as
1218 1225 parameters.
1219 1226 - digest:<digest-type>: the hexadecimal representation of the digest with
1220 1227 that name. Like the size, it is used to validate what was retrieved by
1221 1228 the client matches what the server knows about the bundle.
1222 1229
1223 1230 When multiple digest types are given, all of them are checked.
1224 1231 """
1225 1232 try:
1226 1233 raw_url = inpart.params['url']
1227 1234 except KeyError:
1228 1235 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1229 1236 parsed_url = util.url(raw_url)
1230 1237 if parsed_url.scheme not in capabilities['remote-changegroup']:
1231 1238 raise util.Abort(_('remote-changegroup does not support %s urls') %
1232 1239 parsed_url.scheme)
1233 1240
1234 1241 try:
1235 1242 size = int(inpart.params['size'])
1236 1243 except ValueError:
1237 1244 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1238 1245 % 'size')
1239 1246 except KeyError:
1240 1247 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1241 1248
1242 1249 digests = {}
1243 1250 for typ in inpart.params.get('digests', '').split():
1244 1251 param = 'digest:%s' % typ
1245 1252 try:
1246 1253 value = inpart.params[param]
1247 1254 except KeyError:
1248 1255 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1249 1256 param)
1250 1257 digests[typ] = value
1251 1258
1252 1259 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1253 1260
1254 1261 # Make sure we trigger a transaction creation
1255 1262 #
1256 1263 # The addchangegroup function will get a transaction object by itself, but
1257 1264 # we need to make sure we trigger the creation of a transaction object used
1258 1265 # for the whole processing scope.
1259 1266 op.gettransaction()
1260 1267 from . import exchange
1261 1268 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1262 1269 if not isinstance(cg, changegroup.cg1unpacker):
1263 1270 raise util.Abort(_('%s: not a bundle version 1.0') %
1264 1271 util.hidepassword(raw_url))
1265 1272 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1266 1273 op.records.add('changegroup', {'return': ret})
1267 1274 if op.reply is not None:
1268 1275 # This is definitely not the final form of this
1269 1276 # return. But one need to start somewhere.
1270 1277 part = op.reply.newpart('reply:changegroup')
1271 1278 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1272 1279 part.addparam('return', '%i' % ret, mandatory=False)
1273 1280 try:
1274 1281 real_part.validate()
1275 1282 except util.Abort as e:
1276 1283 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1277 1284 (util.hidepassword(raw_url), str(e)))
1278 1285 assert not inpart.read()
1279 1286
1280 1287 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1281 1288 def handlereplychangegroup(op, inpart):
1282 1289 ret = int(inpart.params['return'])
1283 1290 replyto = int(inpart.params['in-reply-to'])
1284 1291 op.records.add('changegroup', {'return': ret}, replyto)
1285 1292
1286 1293 @parthandler('check:heads')
1287 1294 def handlecheckheads(op, inpart):
1288 1295 """check that head of the repo did not change
1289 1296
1290 1297 This is used to detect a push race when using unbundle.
1291 1298 This replaces the "heads" argument of unbundle."""
1292 1299 h = inpart.read(20)
1293 1300 heads = []
1294 1301 while len(h) == 20:
1295 1302 heads.append(h)
1296 1303 h = inpart.read(20)
1297 1304 assert not h
1298 1305 if heads != op.repo.heads():
1299 1306 raise error.PushRaced('repository changed while pushing - '
1300 1307 'please try again')
1301 1308
1302 1309 @parthandler('output')
1303 1310 def handleoutput(op, inpart):
1304 1311 """forward output captured on the server to the client"""
1305 1312 for line in inpart.read().splitlines():
1306 1313 op.ui.status(('remote: %s\n' % line))
1307 1314
1308 1315 @parthandler('replycaps')
1309 1316 def handlereplycaps(op, inpart):
1310 1317 """Notify that a reply bundle should be created
1311 1318
1312 1319 The payload contains the capabilities information for the reply"""
1313 1320 caps = decodecaps(inpart.read())
1314 1321 if op.reply is None:
1315 1322 op.reply = bundle20(op.ui, caps)
1316 1323
1317 1324 @parthandler('error:abort', ('message', 'hint'))
1318 1325 def handleerrorabort(op, inpart):
1319 1326 """Used to transmit abort error over the wire"""
1320 1327 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1321 1328
1322 1329 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1323 1330 'in-reply-to'))
1324 1331 def handleerrorpushkey(op, inpart):
1325 1332 """Used to transmit failure of a mandatory pushkey over the wire"""
1326 1333 kwargs = {}
1327 1334 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1328 1335 value = inpart.params.get(name)
1329 1336 if value is not None:
1330 1337 kwargs[name] = value
1331 1338 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1332 1339
1333 1340 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1334 1341 def handleerrorunsupportedcontent(op, inpart):
1335 1342 """Used to transmit unknown content error over the wire"""
1336 1343 kwargs = {}
1337 1344 parttype = inpart.params.get('parttype')
1338 1345 if parttype is not None:
1339 1346 kwargs['parttype'] = parttype
1340 1347 params = inpart.params.get('params')
1341 1348 if params is not None:
1342 1349 kwargs['params'] = params.split('\0')
1343 1350
1344 1351 raise error.BundleUnknownFeatureError(**kwargs)
1345 1352
1346 1353 @parthandler('error:pushraced', ('message',))
1347 1354 def handleerrorpushraced(op, inpart):
1348 1355 """Used to transmit push race error over the wire"""
1349 1356 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1350 1357
1351 1358 @parthandler('listkeys', ('namespace',))
1352 1359 def handlelistkeys(op, inpart):
1353 1360 """retrieve pushkey namespace content stored in a bundle2"""
1354 1361 namespace = inpart.params['namespace']
1355 1362 r = pushkey.decodekeys(inpart.read())
1356 1363 op.records.add('listkeys', (namespace, r))
1357 1364
1358 1365 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1359 1366 def handlepushkey(op, inpart):
1360 1367 """process a pushkey request"""
1361 1368 dec = pushkey.decode
1362 1369 namespace = dec(inpart.params['namespace'])
1363 1370 key = dec(inpart.params['key'])
1364 1371 old = dec(inpart.params['old'])
1365 1372 new = dec(inpart.params['new'])
1366 1373 ret = op.repo.pushkey(namespace, key, old, new)
1367 1374 record = {'namespace': namespace,
1368 1375 'key': key,
1369 1376 'old': old,
1370 1377 'new': new}
1371 1378 op.records.add('pushkey', record)
1372 1379 if op.reply is not None:
1373 1380 rpart = op.reply.newpart('reply:pushkey')
1374 1381 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1375 1382 rpart.addparam('return', '%i' % ret, mandatory=False)
1376 1383 if inpart.mandatory and not ret:
1377 1384 kwargs = {}
1378 1385 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1379 1386 if key in inpart.params:
1380 1387 kwargs[key] = inpart.params[key]
1381 1388 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1382 1389
1383 1390 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1384 1391 def handlepushkeyreply(op, inpart):
1385 1392 """retrieve the result of a pushkey request"""
1386 1393 ret = int(inpart.params['return'])
1387 1394 partid = int(inpart.params['in-reply-to'])
1388 1395 op.records.add('pushkey', {'return': ret}, partid)
1389 1396
1390 1397 @parthandler('obsmarkers')
1391 1398 def handleobsmarker(op, inpart):
1392 1399 """add a stream of obsmarkers to the repo"""
1393 1400 tr = op.gettransaction()
1394 1401 markerdata = inpart.read()
1395 1402 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1396 1403 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1397 1404 % len(markerdata))
1398 1405 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1399 1406 if new:
1400 1407 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1401 1408 op.records.add('obsmarkers', {'new': new})
1402 1409 if op.reply is not None:
1403 1410 rpart = op.reply.newpart('reply:obsmarkers')
1404 1411 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1405 1412 rpart.addparam('new', '%i' % new, mandatory=False)
1406 1413
1407 1414
1408 1415 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1409 1416 def handleobsmarkerreply(op, inpart):
1410 1417 """retrieve the result of a pushkey request"""
1411 1418 ret = int(inpart.params['new'])
1412 1419 partid = int(inpart.params['in-reply-to'])
1413 1420 op.records.add('obsmarkers', {'new': ret}, partid)
1414 1421
1415 1422 @parthandler('hgtagsfnodes')
1416 1423 def handlehgtagsfnodes(op, inpart):
1417 1424 """Applies .hgtags fnodes cache entries to the local repo.
1418 1425
1419 1426 Payload is pairs of 20 byte changeset nodes and filenodes.
1420 1427 """
1421 1428 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1422 1429
1423 1430 count = 0
1424 1431 while True:
1425 1432 node = inpart.read(20)
1426 1433 fnode = inpart.read(20)
1427 1434 if len(node) < 20 or len(fnode) < 20:
1428 1435 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1429 1436 break
1430 1437 cache.setfnode(node, fnode)
1431 1438 count += 1
1432 1439
1433 1440 cache.write()
1434 1441 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now