##// END OF EJS Templates
bundle2: use absolute_import
Gregory Szorc -
r25919:8221fefa default
parent child Browse files
Show More
@@ -1,1410 +1,1416 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 from __future__ import absolute_import
149
148 150 import errno
149 import sys
150 import util
151 import struct
152 import urllib
151 import re
153 152 import string
154 import obsolete
155 import pushkey
156 import url
157 import re
153 import struct
154 import sys
155 import urllib
158 156
159 import changegroup, error, tags
160 from i18n import _
157 from .i18n import _
158 from . import (
159 changegroup,
160 error,
161 obsolete,
162 pushkey,
163 tags,
164 url,
165 util,
166 )
161 167
162 168 _pack = struct.pack
163 169 _unpack = struct.unpack
164 170
165 171 _fstreamparamsize = '>i'
166 172 _fpartheadersize = '>i'
167 173 _fparttypesize = '>B'
168 174 _fpartid = '>I'
169 175 _fpayloadsize = '>i'
170 176 _fpartparamcount = '>BB'
171 177
172 178 preferedchunksize = 4096
173 179
174 180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
175 181
176 182 def outdebug(ui, message):
177 183 """debug regarding output stream (bundling)"""
178 184 if ui.configbool('devel', 'bundle2.debug', False):
179 185 ui.debug('bundle2-output: %s\n' % message)
180 186
181 187 def indebug(ui, message):
182 188 """debug on input stream (unbundling)"""
183 189 if ui.configbool('devel', 'bundle2.debug', False):
184 190 ui.debug('bundle2-input: %s\n' % message)
185 191
186 192 def validateparttype(parttype):
187 193 """raise ValueError if a parttype contains invalid character"""
188 194 if _parttypeforbidden.search(parttype):
189 195 raise ValueError(parttype)
190 196
191 197 def _makefpartparamsizes(nbparams):
192 198 """return a struct format to read part parameter sizes
193 199
194 200 The number parameters is variable so we need to build that format
195 201 dynamically.
196 202 """
197 203 return '>'+('BB'*nbparams)
198 204
199 205 parthandlermapping = {}
200 206
201 207 def parthandler(parttype, params=()):
202 208 """decorator that register a function as a bundle2 part handler
203 209
204 210 eg::
205 211
206 212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
207 213 def myparttypehandler(...):
208 214 '''process a part of type "my part".'''
209 215 ...
210 216 """
211 217 validateparttype(parttype)
212 218 def _decorator(func):
213 219 lparttype = parttype.lower() # enforce lower case matching.
214 220 assert lparttype not in parthandlermapping
215 221 parthandlermapping[lparttype] = func
216 222 func.params = frozenset(params)
217 223 return func
218 224 return _decorator
219 225
220 226 class unbundlerecords(object):
221 227 """keep record of what happens during and unbundle
222 228
223 229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
224 230 category of record and obj is an arbitrary object.
225 231
226 232 `records['cat']` will return all entries of this category 'cat'.
227 233
228 234 Iterating on the object itself will yield `('category', obj)` tuples
229 235 for all entries.
230 236
231 237 All iterations happens in chronological order.
232 238 """
233 239
234 240 def __init__(self):
235 241 self._categories = {}
236 242 self._sequences = []
237 243 self._replies = {}
238 244
239 245 def add(self, category, entry, inreplyto=None):
240 246 """add a new record of a given category.
241 247
242 248 The entry can then be retrieved in the list returned by
243 249 self['category']."""
244 250 self._categories.setdefault(category, []).append(entry)
245 251 self._sequences.append((category, entry))
246 252 if inreplyto is not None:
247 253 self.getreplies(inreplyto).add(category, entry)
248 254
249 255 def getreplies(self, partid):
250 256 """get the records that are replies to a specific part"""
251 257 return self._replies.setdefault(partid, unbundlerecords())
252 258
253 259 def __getitem__(self, cat):
254 260 return tuple(self._categories.get(cat, ()))
255 261
256 262 def __iter__(self):
257 263 return iter(self._sequences)
258 264
259 265 def __len__(self):
260 266 return len(self._sequences)
261 267
262 268 def __nonzero__(self):
263 269 return bool(self._sequences)
264 270
265 271 class bundleoperation(object):
266 272 """an object that represents a single bundling process
267 273
268 274 Its purpose is to carry unbundle-related objects and states.
269 275
270 276 A new object should be created at the beginning of each bundle processing.
271 277 The object is to be returned by the processing function.
272 278
273 279 The object has very little content now it will ultimately contain:
274 280 * an access to the repo the bundle is applied to,
275 281 * a ui object,
276 282 * a way to retrieve a transaction to add changes to the repo,
277 283 * a way to record the result of processing each part,
278 284 * a way to construct a bundle response when applicable.
279 285 """
280 286
281 287 def __init__(self, repo, transactiongetter, captureoutput=True):
282 288 self.repo = repo
283 289 self.ui = repo.ui
284 290 self.records = unbundlerecords()
285 291 self.gettransaction = transactiongetter
286 292 self.reply = None
287 293 self.captureoutput = captureoutput
288 294
289 295 class TransactionUnavailable(RuntimeError):
290 296 pass
291 297
292 298 def _notransaction():
293 299 """default method to get a transaction while processing a bundle
294 300
295 301 Raise an exception to highlight the fact that no transaction was expected
296 302 to be created"""
297 303 raise TransactionUnavailable()
298 304
299 305 def processbundle(repo, unbundler, transactiongetter=None, op=None):
300 306 """This function process a bundle, apply effect to/from a repo
301 307
302 308 It iterates over each part then searches for and uses the proper handling
303 309 code to process the part. Parts are processed in order.
304 310
305 311 This is very early version of this function that will be strongly reworked
306 312 before final usage.
307 313
308 314 Unknown Mandatory part will abort the process.
309 315
310 316 It is temporarily possible to provide a prebuilt bundleoperation to the
311 317 function. This is used to ensure output is properly propagated in case of
312 318 an error during the unbundling. This output capturing part will likely be
313 319 reworked and this ability will probably go away in the process.
314 320 """
315 321 if op is None:
316 322 if transactiongetter is None:
317 323 transactiongetter = _notransaction
318 324 op = bundleoperation(repo, transactiongetter)
319 325 # todo:
320 326 # - replace this is a init function soon.
321 327 # - exception catching
322 328 unbundler.params
323 329 if repo.ui.debugflag:
324 330 msg = ['bundle2-input-bundle:']
325 331 if unbundler.params:
326 332 msg.append(' %i params')
327 333 if op.gettransaction is None:
328 334 msg.append(' no-transaction')
329 335 else:
330 336 msg.append(' with-transaction')
331 337 msg.append('\n')
332 338 repo.ui.debug(''.join(msg))
333 339 iterparts = enumerate(unbundler.iterparts())
334 340 part = None
335 341 nbpart = 0
336 342 try:
337 343 for nbpart, part in iterparts:
338 344 _processpart(op, part)
339 345 except BaseException as exc:
340 346 for nbpart, part in iterparts:
341 347 # consume the bundle content
342 348 part.seek(0, 2)
343 349 # Small hack to let caller code distinguish exceptions from bundle2
344 350 # processing from processing the old format. This is mostly
345 351 # needed to handle different return codes to unbundle according to the
346 352 # type of bundle. We should probably clean up or drop this return code
347 353 # craziness in a future version.
348 354 exc.duringunbundle2 = True
349 355 salvaged = []
350 356 replycaps = None
351 357 if op.reply is not None:
352 358 salvaged = op.reply.salvageoutput()
353 359 replycaps = op.reply.capabilities
354 360 exc._replycaps = replycaps
355 361 exc._bundle2salvagedoutput = salvaged
356 362 raise
357 363 finally:
358 364 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
359 365
360 366 return op
361 367
362 368 def _processpart(op, part):
363 369 """process a single part from a bundle
364 370
365 371 The part is guaranteed to have been fully consumed when the function exits
366 372 (even if an exception is raised)."""
367 373 status = 'unknown' # used by debug output
368 374 try:
369 375 try:
370 376 handler = parthandlermapping.get(part.type)
371 377 if handler is None:
372 378 status = 'unsupported-type'
373 379 raise error.UnsupportedPartError(parttype=part.type)
374 380 indebug(op.ui, 'found a handler for part %r' % part.type)
375 381 unknownparams = part.mandatorykeys - handler.params
376 382 if unknownparams:
377 383 unknownparams = list(unknownparams)
378 384 unknownparams.sort()
379 385 status = 'unsupported-params (%s)' % unknownparams
380 386 raise error.UnsupportedPartError(parttype=part.type,
381 387 params=unknownparams)
382 388 status = 'supported'
383 389 except error.UnsupportedPartError as exc:
384 390 if part.mandatory: # mandatory parts
385 391 raise
386 392 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
387 393 return # skip to part processing
388 394 finally:
389 395 if op.ui.debugflag:
390 396 msg = ['bundle2-input-part: "%s"' % part.type]
391 397 if not part.mandatory:
392 398 msg.append(' (advisory)')
393 399 nbmp = len(part.mandatorykeys)
394 400 nbap = len(part.params) - nbmp
395 401 if nbmp or nbap:
396 402 msg.append(' (params:')
397 403 if nbmp:
398 404 msg.append(' %i mandatory' % nbmp)
399 405 if nbap:
400 406 msg.append(' %i advisory' % nbmp)
401 407 msg.append(')')
402 408 msg.append(' %s\n' % status)
403 409 op.ui.debug(''.join(msg))
404 410
405 411 # handler is called outside the above try block so that we don't
406 412 # risk catching KeyErrors from anything other than the
407 413 # parthandlermapping lookup (any KeyError raised by handler()
408 414 # itself represents a defect of a different variety).
409 415 output = None
410 416 if op.captureoutput and op.reply is not None:
411 417 op.ui.pushbuffer(error=True, subproc=True)
412 418 output = ''
413 419 try:
414 420 handler(op, part)
415 421 finally:
416 422 if output is not None:
417 423 output = op.ui.popbuffer()
418 424 if output:
419 425 outpart = op.reply.newpart('output', data=output,
420 426 mandatory=False)
421 427 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
422 428 finally:
423 429 # consume the part content to not corrupt the stream.
424 430 part.seek(0, 2)
425 431
426 432
427 433 def decodecaps(blob):
428 434 """decode a bundle2 caps bytes blob into a dictionary
429 435
430 436 The blob is a list of capabilities (one per line)
431 437 Capabilities may have values using a line of the form::
432 438
433 439 capability=value1,value2,value3
434 440
435 441 The values are always a list."""
436 442 caps = {}
437 443 for line in blob.splitlines():
438 444 if not line:
439 445 continue
440 446 if '=' not in line:
441 447 key, vals = line, ()
442 448 else:
443 449 key, vals = line.split('=', 1)
444 450 vals = vals.split(',')
445 451 key = urllib.unquote(key)
446 452 vals = [urllib.unquote(v) for v in vals]
447 453 caps[key] = vals
448 454 return caps
449 455
450 456 def encodecaps(caps):
451 457 """encode a bundle2 caps dictionary into a bytes blob"""
452 458 chunks = []
453 459 for ca in sorted(caps):
454 460 vals = caps[ca]
455 461 ca = urllib.quote(ca)
456 462 vals = [urllib.quote(v) for v in vals]
457 463 if vals:
458 464 ca = "%s=%s" % (ca, ','.join(vals))
459 465 chunks.append(ca)
460 466 return '\n'.join(chunks)
461 467
462 468 class bundle20(object):
463 469 """represent an outgoing bundle2 container
464 470
465 471 Use the `addparam` method to add stream level parameter. and `newpart` to
466 472 populate it. Then call `getchunks` to retrieve all the binary chunks of
467 473 data that compose the bundle2 container."""
468 474
469 475 _magicstring = 'HG20'
470 476
471 477 def __init__(self, ui, capabilities=()):
472 478 self.ui = ui
473 479 self._params = []
474 480 self._parts = []
475 481 self.capabilities = dict(capabilities)
476 482
477 483 @property
478 484 def nbparts(self):
479 485 """total number of parts added to the bundler"""
480 486 return len(self._parts)
481 487
482 488 # methods used to defines the bundle2 content
483 489 def addparam(self, name, value=None):
484 490 """add a stream level parameter"""
485 491 if not name:
486 492 raise ValueError('empty parameter name')
487 493 if name[0] not in string.letters:
488 494 raise ValueError('non letter first character: %r' % name)
489 495 self._params.append((name, value))
490 496
491 497 def addpart(self, part):
492 498 """add a new part to the bundle2 container
493 499
494 500 Parts contains the actual applicative payload."""
495 501 assert part.id is None
496 502 part.id = len(self._parts) # very cheap counter
497 503 self._parts.append(part)
498 504
499 505 def newpart(self, typeid, *args, **kwargs):
500 506 """create a new part and add it to the containers
501 507
502 508 As the part is directly added to the containers. For now, this means
503 509 that any failure to properly initialize the part after calling
504 510 ``newpart`` should result in a failure of the whole bundling process.
505 511
506 512 You can still fall back to manually create and add if you need better
507 513 control."""
508 514 part = bundlepart(typeid, *args, **kwargs)
509 515 self.addpart(part)
510 516 return part
511 517
512 518 # methods used to generate the bundle2 stream
513 519 def getchunks(self):
514 520 if self.ui.debugflag:
515 521 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
516 522 if self._params:
517 523 msg.append(' (%i params)' % len(self._params))
518 524 msg.append(' %i parts total\n' % len(self._parts))
519 525 self.ui.debug(''.join(msg))
520 526 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
521 527 yield self._magicstring
522 528 param = self._paramchunk()
523 529 outdebug(self.ui, 'bundle parameter: %s' % param)
524 530 yield _pack(_fstreamparamsize, len(param))
525 531 if param:
526 532 yield param
527 533
528 534 outdebug(self.ui, 'start of parts')
529 535 for part in self._parts:
530 536 outdebug(self.ui, 'bundle part: "%s"' % part.type)
531 537 for chunk in part.getchunks(ui=self.ui):
532 538 yield chunk
533 539 outdebug(self.ui, 'end of bundle')
534 540 yield _pack(_fpartheadersize, 0)
535 541
536 542 def _paramchunk(self):
537 543 """return a encoded version of all stream parameters"""
538 544 blocks = []
539 545 for par, value in self._params:
540 546 par = urllib.quote(par)
541 547 if value is not None:
542 548 value = urllib.quote(value)
543 549 par = '%s=%s' % (par, value)
544 550 blocks.append(par)
545 551 return ' '.join(blocks)
546 552
547 553 def salvageoutput(self):
548 554 """return a list with a copy of all output parts in the bundle
549 555
550 556 This is meant to be used during error handling to make sure we preserve
551 557 server output"""
552 558 salvaged = []
553 559 for part in self._parts:
554 560 if part.type.startswith('output'):
555 561 salvaged.append(part.copy())
556 562 return salvaged
557 563
558 564
559 565 class unpackermixin(object):
560 566 """A mixin to extract bytes and struct data from a stream"""
561 567
562 568 def __init__(self, fp):
563 569 self._fp = fp
564 570 self._seekable = (util.safehasattr(fp, 'seek') and
565 571 util.safehasattr(fp, 'tell'))
566 572
567 573 def _unpack(self, format):
568 574 """unpack this struct format from the stream"""
569 575 data = self._readexact(struct.calcsize(format))
570 576 return _unpack(format, data)
571 577
572 578 def _readexact(self, size):
573 579 """read exactly <size> bytes from the stream"""
574 580 return changegroup.readexactly(self._fp, size)
575 581
576 582 def seek(self, offset, whence=0):
577 583 """move the underlying file pointer"""
578 584 if self._seekable:
579 585 return self._fp.seek(offset, whence)
580 586 else:
581 587 raise NotImplementedError(_('File pointer is not seekable'))
582 588
583 589 def tell(self):
584 590 """return the file offset, or None if file is not seekable"""
585 591 if self._seekable:
586 592 try:
587 593 return self._fp.tell()
588 594 except IOError as e:
589 595 if e.errno == errno.ESPIPE:
590 596 self._seekable = False
591 597 else:
592 598 raise
593 599 return None
594 600
595 601 def close(self):
596 602 """close underlying file"""
597 603 if util.safehasattr(self._fp, 'close'):
598 604 return self._fp.close()
599 605
600 606 def getunbundler(ui, fp, magicstring=None):
601 607 """return a valid unbundler object for a given magicstring"""
602 608 if magicstring is None:
603 609 magicstring = changegroup.readexactly(fp, 4)
604 610 magic, version = magicstring[0:2], magicstring[2:4]
605 611 if magic != 'HG':
606 612 raise util.Abort(_('not a Mercurial bundle'))
607 613 unbundlerclass = formatmap.get(version)
608 614 if unbundlerclass is None:
609 615 raise util.Abort(_('unknown bundle version %s') % version)
610 616 unbundler = unbundlerclass(ui, fp)
611 617 indebug(ui, 'start processing of %s stream' % magicstring)
612 618 return unbundler
613 619
614 620 class unbundle20(unpackermixin):
615 621 """interpret a bundle2 stream
616 622
617 623 This class is fed with a binary stream and yields parts through its
618 624 `iterparts` methods."""
619 625
620 626 def __init__(self, ui, fp):
621 627 """If header is specified, we do not read it out of the stream."""
622 628 self.ui = ui
623 629 super(unbundle20, self).__init__(fp)
624 630
625 631 @util.propertycache
626 632 def params(self):
627 633 """dictionary of stream level parameters"""
628 634 indebug(self.ui, 'reading bundle2 stream parameters')
629 635 params = {}
630 636 paramssize = self._unpack(_fstreamparamsize)[0]
631 637 if paramssize < 0:
632 638 raise error.BundleValueError('negative bundle param size: %i'
633 639 % paramssize)
634 640 if paramssize:
635 641 for p in self._readexact(paramssize).split(' '):
636 642 p = p.split('=', 1)
637 643 p = [urllib.unquote(i) for i in p]
638 644 if len(p) < 2:
639 645 p.append(None)
640 646 self._processparam(*p)
641 647 params[p[0]] = p[1]
642 648 return params
643 649
644 650 def _processparam(self, name, value):
645 651 """process a parameter, applying its effect if needed
646 652
647 653 Parameter starting with a lower case letter are advisory and will be
648 654 ignored when unknown. Those starting with an upper case letter are
649 655 mandatory and will this function will raise a KeyError when unknown.
650 656
651 657 Note: no option are currently supported. Any input will be either
652 658 ignored or failing.
653 659 """
654 660 if not name:
655 661 raise ValueError('empty parameter name')
656 662 if name[0] not in string.letters:
657 663 raise ValueError('non letter first character: %r' % name)
658 664 # Some logic will be later added here to try to process the option for
659 665 # a dict of known parameter.
660 666 if name[0].islower():
661 667 indebug(self.ui, "ignoring unknown parameter %r" % name)
662 668 else:
663 669 raise error.UnsupportedPartError(params=(name,))
664 670
665 671
666 672 def iterparts(self):
667 673 """yield all parts contained in the stream"""
668 674 # make sure param have been loaded
669 675 self.params
670 676 indebug(self.ui, 'start extraction of bundle2 parts')
671 677 headerblock = self._readpartheader()
672 678 while headerblock is not None:
673 679 part = unbundlepart(self.ui, headerblock, self._fp)
674 680 yield part
675 681 part.seek(0, 2)
676 682 headerblock = self._readpartheader()
677 683 indebug(self.ui, 'end of bundle2 stream')
678 684
679 685 def _readpartheader(self):
680 686 """reads a part header size and return the bytes blob
681 687
682 688 returns None if empty"""
683 689 headersize = self._unpack(_fpartheadersize)[0]
684 690 if headersize < 0:
685 691 raise error.BundleValueError('negative part header size: %i'
686 692 % headersize)
687 693 indebug(self.ui, 'part header size: %i' % headersize)
688 694 if headersize:
689 695 return self._readexact(headersize)
690 696 return None
691 697
692 698 def compressed(self):
693 699 return False
694 700
695 701 formatmap = {'20': unbundle20}
696 702
697 703 class bundlepart(object):
698 704 """A bundle2 part contains application level payload
699 705
700 706 The part `type` is used to route the part to the application level
701 707 handler.
702 708
703 709 The part payload is contained in ``part.data``. It could be raw bytes or a
704 710 generator of byte chunks.
705 711
706 712 You can add parameters to the part using the ``addparam`` method.
707 713 Parameters can be either mandatory (default) or advisory. Remote side
708 714 should be able to safely ignore the advisory ones.
709 715
710 716 Both data and parameters cannot be modified after the generation has begun.
711 717 """
712 718
713 719 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
714 720 data='', mandatory=True):
715 721 validateparttype(parttype)
716 722 self.id = None
717 723 self.type = parttype
718 724 self._data = data
719 725 self._mandatoryparams = list(mandatoryparams)
720 726 self._advisoryparams = list(advisoryparams)
721 727 # checking for duplicated entries
722 728 self._seenparams = set()
723 729 for pname, __ in self._mandatoryparams + self._advisoryparams:
724 730 if pname in self._seenparams:
725 731 raise RuntimeError('duplicated params: %s' % pname)
726 732 self._seenparams.add(pname)
727 733 # status of the part's generation:
728 734 # - None: not started,
729 735 # - False: currently generated,
730 736 # - True: generation done.
731 737 self._generated = None
732 738 self.mandatory = mandatory
733 739
734 740 def copy(self):
735 741 """return a copy of the part
736 742
737 743 The new part have the very same content but no partid assigned yet.
738 744 Parts with generated data cannot be copied."""
739 745 assert not util.safehasattr(self.data, 'next')
740 746 return self.__class__(self.type, self._mandatoryparams,
741 747 self._advisoryparams, self._data, self.mandatory)
742 748
743 749 # methods used to defines the part content
744 750 def __setdata(self, data):
745 751 if self._generated is not None:
746 752 raise error.ReadOnlyPartError('part is being generated')
747 753 self._data = data
748 754 def __getdata(self):
749 755 return self._data
750 756 data = property(__getdata, __setdata)
751 757
752 758 @property
753 759 def mandatoryparams(self):
754 760 # make it an immutable tuple to force people through ``addparam``
755 761 return tuple(self._mandatoryparams)
756 762
757 763 @property
758 764 def advisoryparams(self):
759 765 # make it an immutable tuple to force people through ``addparam``
760 766 return tuple(self._advisoryparams)
761 767
762 768 def addparam(self, name, value='', mandatory=True):
763 769 if self._generated is not None:
764 770 raise error.ReadOnlyPartError('part is being generated')
765 771 if name in self._seenparams:
766 772 raise ValueError('duplicated params: %s' % name)
767 773 self._seenparams.add(name)
768 774 params = self._advisoryparams
769 775 if mandatory:
770 776 params = self._mandatoryparams
771 777 params.append((name, value))
772 778
773 779 # methods used to generates the bundle2 stream
774 780 def getchunks(self, ui):
775 781 if self._generated is not None:
776 782 raise RuntimeError('part can only be consumed once')
777 783 self._generated = False
778 784
779 785 if ui.debugflag:
780 786 msg = ['bundle2-output-part: "%s"' % self.type]
781 787 if not self.mandatory:
782 788 msg.append(' (advisory)')
783 789 nbmp = len(self.mandatoryparams)
784 790 nbap = len(self.advisoryparams)
785 791 if nbmp or nbap:
786 792 msg.append(' (params:')
787 793 if nbmp:
788 794 msg.append(' %i mandatory' % nbmp)
789 795 if nbap:
790 796 msg.append(' %i advisory' % nbmp)
791 797 msg.append(')')
792 798 if not self.data:
793 799 msg.append(' empty payload')
794 800 elif util.safehasattr(self.data, 'next'):
795 801 msg.append(' streamed payload')
796 802 else:
797 803 msg.append(' %i bytes payload' % len(self.data))
798 804 msg.append('\n')
799 805 ui.debug(''.join(msg))
800 806
801 807 #### header
802 808 if self.mandatory:
803 809 parttype = self.type.upper()
804 810 else:
805 811 parttype = self.type.lower()
806 812 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
807 813 ## parttype
808 814 header = [_pack(_fparttypesize, len(parttype)),
809 815 parttype, _pack(_fpartid, self.id),
810 816 ]
811 817 ## parameters
812 818 # count
813 819 manpar = self.mandatoryparams
814 820 advpar = self.advisoryparams
815 821 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
816 822 # size
817 823 parsizes = []
818 824 for key, value in manpar:
819 825 parsizes.append(len(key))
820 826 parsizes.append(len(value))
821 827 for key, value in advpar:
822 828 parsizes.append(len(key))
823 829 parsizes.append(len(value))
824 830 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
825 831 header.append(paramsizes)
826 832 # key, value
827 833 for key, value in manpar:
828 834 header.append(key)
829 835 header.append(value)
830 836 for key, value in advpar:
831 837 header.append(key)
832 838 header.append(value)
833 839 ## finalize header
834 840 headerchunk = ''.join(header)
835 841 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
836 842 yield _pack(_fpartheadersize, len(headerchunk))
837 843 yield headerchunk
838 844 ## payload
839 845 try:
840 846 for chunk in self._payloadchunks():
841 847 outdebug(ui, 'payload chunk size: %i' % len(chunk))
842 848 yield _pack(_fpayloadsize, len(chunk))
843 849 yield chunk
844 850 except BaseException as exc:
845 851 # backup exception data for later
846 852 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
847 853 % exc)
848 854 exc_info = sys.exc_info()
849 855 msg = 'unexpected error: %s' % exc
850 856 interpart = bundlepart('error:abort', [('message', msg)],
851 857 mandatory=False)
852 858 interpart.id = 0
853 859 yield _pack(_fpayloadsize, -1)
854 860 for chunk in interpart.getchunks(ui=ui):
855 861 yield chunk
856 862 outdebug(ui, 'closing payload chunk')
857 863 # abort current part payload
858 864 yield _pack(_fpayloadsize, 0)
859 865 raise exc_info[0], exc_info[1], exc_info[2]
860 866 # end of payload
861 867 outdebug(ui, 'closing payload chunk')
862 868 yield _pack(_fpayloadsize, 0)
863 869 self._generated = True
864 870
865 871 def _payloadchunks(self):
866 872 """yield chunks of a the part payload
867 873
868 874 Exists to handle the different methods to provide data to a part."""
869 875 # we only support fixed size data now.
870 876 # This will be improved in the future.
871 877 if util.safehasattr(self.data, 'next'):
872 878 buff = util.chunkbuffer(self.data)
873 879 chunk = buff.read(preferedchunksize)
874 880 while chunk:
875 881 yield chunk
876 882 chunk = buff.read(preferedchunksize)
877 883 elif len(self.data):
878 884 yield self.data
879 885
880 886
881 887 flaginterrupt = -1
882 888
883 889 class interrupthandler(unpackermixin):
884 890 """read one part and process it with restricted capability
885 891
886 892 This allows to transmit exception raised on the producer size during part
887 893 iteration while the consumer is reading a part.
888 894
889 895 Part processed in this manner only have access to a ui object,"""
890 896
891 897 def __init__(self, ui, fp):
892 898 super(interrupthandler, self).__init__(fp)
893 899 self.ui = ui
894 900
895 901 def _readpartheader(self):
896 902 """reads a part header size and return the bytes blob
897 903
898 904 returns None if empty"""
899 905 headersize = self._unpack(_fpartheadersize)[0]
900 906 if headersize < 0:
901 907 raise error.BundleValueError('negative part header size: %i'
902 908 % headersize)
903 909 indebug(self.ui, 'part header size: %i\n' % headersize)
904 910 if headersize:
905 911 return self._readexact(headersize)
906 912 return None
907 913
908 914 def __call__(self):
909 915
910 916 self.ui.debug('bundle2-input-stream-interrupt:'
911 917 ' opening out of band context\n')
912 918 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
913 919 headerblock = self._readpartheader()
914 920 if headerblock is None:
915 921 indebug(self.ui, 'no part found during interruption.')
916 922 return
917 923 part = unbundlepart(self.ui, headerblock, self._fp)
918 924 op = interruptoperation(self.ui)
919 925 _processpart(op, part)
920 926 self.ui.debug('bundle2-input-stream-interrupt:'
921 927 ' closing out of band context\n')
922 928
923 929 class interruptoperation(object):
924 930 """A limited operation to be use by part handler during interruption
925 931
926 932 It only have access to an ui object.
927 933 """
928 934
929 935 def __init__(self, ui):
930 936 self.ui = ui
931 937 self.reply = None
932 938 self.captureoutput = False
933 939
934 940 @property
935 941 def repo(self):
936 942 raise RuntimeError('no repo access from stream interruption')
937 943
938 944 def gettransaction(self):
939 945 raise TransactionUnavailable('no repo access from stream interruption')
940 946
941 947 class unbundlepart(unpackermixin):
942 948 """a bundle part read from a bundle"""
943 949
944 950 def __init__(self, ui, header, fp):
945 951 super(unbundlepart, self).__init__(fp)
946 952 self.ui = ui
947 953 # unbundle state attr
948 954 self._headerdata = header
949 955 self._headeroffset = 0
950 956 self._initialized = False
951 957 self.consumed = False
952 958 # part data
953 959 self.id = None
954 960 self.type = None
955 961 self.mandatoryparams = None
956 962 self.advisoryparams = None
957 963 self.params = None
958 964 self.mandatorykeys = ()
959 965 self._payloadstream = None
960 966 self._readheader()
961 967 self._mandatory = None
962 968 self._chunkindex = [] #(payload, file) position tuples for chunk starts
963 969 self._pos = 0
964 970
965 971 def _fromheader(self, size):
966 972 """return the next <size> byte from the header"""
967 973 offset = self._headeroffset
968 974 data = self._headerdata[offset:(offset + size)]
969 975 self._headeroffset = offset + size
970 976 return data
971 977
972 978 def _unpackheader(self, format):
973 979 """read given format from header
974 980
975 981 This automatically compute the size of the format to read."""
976 982 data = self._fromheader(struct.calcsize(format))
977 983 return _unpack(format, data)
978 984
979 985 def _initparams(self, mandatoryparams, advisoryparams):
980 986 """internal function to setup all logic related parameters"""
981 987 # make it read only to prevent people touching it by mistake.
982 988 self.mandatoryparams = tuple(mandatoryparams)
983 989 self.advisoryparams = tuple(advisoryparams)
984 990 # user friendly UI
985 991 self.params = dict(self.mandatoryparams)
986 992 self.params.update(dict(self.advisoryparams))
987 993 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
988 994
989 995 def _payloadchunks(self, chunknum=0):
990 996 '''seek to specified chunk and start yielding data'''
991 997 if len(self._chunkindex) == 0:
992 998 assert chunknum == 0, 'Must start with chunk 0'
993 999 self._chunkindex.append((0, super(unbundlepart, self).tell()))
994 1000 else:
995 1001 assert chunknum < len(self._chunkindex), \
996 1002 'Unknown chunk %d' % chunknum
997 1003 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
998 1004
999 1005 pos = self._chunkindex[chunknum][0]
1000 1006 payloadsize = self._unpack(_fpayloadsize)[0]
1001 1007 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1002 1008 while payloadsize:
1003 1009 if payloadsize == flaginterrupt:
1004 1010 # interruption detection, the handler will now read a
1005 1011 # single part and process it.
1006 1012 interrupthandler(self.ui, self._fp)()
1007 1013 elif payloadsize < 0:
1008 1014 msg = 'negative payload chunk size: %i' % payloadsize
1009 1015 raise error.BundleValueError(msg)
1010 1016 else:
1011 1017 result = self._readexact(payloadsize)
1012 1018 chunknum += 1
1013 1019 pos += payloadsize
1014 1020 if chunknum == len(self._chunkindex):
1015 1021 self._chunkindex.append((pos,
1016 1022 super(unbundlepart, self).tell()))
1017 1023 yield result
1018 1024 payloadsize = self._unpack(_fpayloadsize)[0]
1019 1025 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1020 1026
1021 1027 def _findchunk(self, pos):
1022 1028 '''for a given payload position, return a chunk number and offset'''
1023 1029 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1024 1030 if ppos == pos:
1025 1031 return chunk, 0
1026 1032 elif ppos > pos:
1027 1033 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1028 1034 raise ValueError('Unknown chunk')
1029 1035
1030 1036 def _readheader(self):
1031 1037 """read the header and setup the object"""
1032 1038 typesize = self._unpackheader(_fparttypesize)[0]
1033 1039 self.type = self._fromheader(typesize)
1034 1040 indebug(self.ui, 'part type: "%s"' % self.type)
1035 1041 self.id = self._unpackheader(_fpartid)[0]
1036 1042 indebug(self.ui, 'part id: "%s"' % self.id)
1037 1043 # extract mandatory bit from type
1038 1044 self.mandatory = (self.type != self.type.lower())
1039 1045 self.type = self.type.lower()
1040 1046 ## reading parameters
1041 1047 # param count
1042 1048 mancount, advcount = self._unpackheader(_fpartparamcount)
1043 1049 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1044 1050 # param size
1045 1051 fparamsizes = _makefpartparamsizes(mancount + advcount)
1046 1052 paramsizes = self._unpackheader(fparamsizes)
1047 1053 # make it a list of couple again
1048 1054 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1049 1055 # split mandatory from advisory
1050 1056 mansizes = paramsizes[:mancount]
1051 1057 advsizes = paramsizes[mancount:]
1052 1058 # retrieve param value
1053 1059 manparams = []
1054 1060 for key, value in mansizes:
1055 1061 manparams.append((self._fromheader(key), self._fromheader(value)))
1056 1062 advparams = []
1057 1063 for key, value in advsizes:
1058 1064 advparams.append((self._fromheader(key), self._fromheader(value)))
1059 1065 self._initparams(manparams, advparams)
1060 1066 ## part payload
1061 1067 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1062 1068 # we read the data, tell it
1063 1069 self._initialized = True
1064 1070
1065 1071 def read(self, size=None):
1066 1072 """read payload data"""
1067 1073 if not self._initialized:
1068 1074 self._readheader()
1069 1075 if size is None:
1070 1076 data = self._payloadstream.read()
1071 1077 else:
1072 1078 data = self._payloadstream.read(size)
1073 1079 self._pos += len(data)
1074 1080 if size is None or len(data) < size:
1075 1081 if not self.consumed and self._pos:
1076 1082 self.ui.debug('bundle2-input-part: total payload size %i\n'
1077 1083 % self._pos)
1078 1084 self.consumed = True
1079 1085 return data
1080 1086
1081 1087 def tell(self):
1082 1088 return self._pos
1083 1089
1084 1090 def seek(self, offset, whence=0):
1085 1091 if whence == 0:
1086 1092 newpos = offset
1087 1093 elif whence == 1:
1088 1094 newpos = self._pos + offset
1089 1095 elif whence == 2:
1090 1096 if not self.consumed:
1091 1097 self.read()
1092 1098 newpos = self._chunkindex[-1][0] - offset
1093 1099 else:
1094 1100 raise ValueError('Unknown whence value: %r' % (whence,))
1095 1101
1096 1102 if newpos > self._chunkindex[-1][0] and not self.consumed:
1097 1103 self.read()
1098 1104 if not 0 <= newpos <= self._chunkindex[-1][0]:
1099 1105 raise ValueError('Offset out of range')
1100 1106
1101 1107 if self._pos != newpos:
1102 1108 chunk, internaloffset = self._findchunk(newpos)
1103 1109 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1104 1110 adjust = self.read(internaloffset)
1105 1111 if len(adjust) != internaloffset:
1106 1112 raise util.Abort(_('Seek failed\n'))
1107 1113 self._pos = newpos
1108 1114
1109 1115 # These are only the static capabilities.
1110 1116 # Check the 'getrepocaps' function for the rest.
1111 1117 capabilities = {'HG20': (),
1112 1118 'error': ('abort', 'unsupportedcontent', 'pushraced',
1113 1119 'pushkey'),
1114 1120 'listkeys': (),
1115 1121 'pushkey': (),
1116 1122 'digests': tuple(sorted(util.DIGESTS.keys())),
1117 1123 'remote-changegroup': ('http', 'https'),
1118 1124 'hgtagsfnodes': (),
1119 1125 }
1120 1126
1121 1127 def getrepocaps(repo, allowpushback=False):
1122 1128 """return the bundle2 capabilities for a given repo
1123 1129
1124 1130 Exists to allow extensions (like evolution) to mutate the capabilities.
1125 1131 """
1126 1132 caps = capabilities.copy()
1127 1133 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1128 1134 if obsolete.isenabled(repo, obsolete.exchangeopt):
1129 1135 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1130 1136 caps['obsmarkers'] = supportedformat
1131 1137 if allowpushback:
1132 1138 caps['pushback'] = ()
1133 1139 return caps
1134 1140
1135 1141 def bundle2caps(remote):
1136 1142 """return the bundle capabilities of a peer as dict"""
1137 1143 raw = remote.capable('bundle2')
1138 1144 if not raw and raw != '':
1139 1145 return {}
1140 1146 capsblob = urllib.unquote(remote.capable('bundle2'))
1141 1147 return decodecaps(capsblob)
1142 1148
1143 1149 def obsmarkersversion(caps):
1144 1150 """extract the list of supported obsmarkers versions from a bundle2caps dict
1145 1151 """
1146 1152 obscaps = caps.get('obsmarkers', ())
1147 1153 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1148 1154
1149 1155 @parthandler('changegroup', ('version', 'nbchanges'))
1150 1156 def handlechangegroup(op, inpart):
1151 1157 """apply a changegroup part on the repo
1152 1158
1153 1159 This is a very early implementation that will massive rework before being
1154 1160 inflicted to any end-user.
1155 1161 """
1156 1162 # Make sure we trigger a transaction creation
1157 1163 #
1158 1164 # The addchangegroup function will get a transaction object by itself, but
1159 1165 # we need to make sure we trigger the creation of a transaction object used
1160 1166 # for the whole processing scope.
1161 1167 op.gettransaction()
1162 1168 unpackerversion = inpart.params.get('version', '01')
1163 1169 # We should raise an appropriate exception here
1164 1170 unpacker = changegroup.packermap[unpackerversion][1]
1165 1171 cg = unpacker(inpart, 'UN')
1166 1172 # the source and url passed here are overwritten by the one contained in
1167 1173 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1168 1174 nbchangesets = None
1169 1175 if 'nbchanges' in inpart.params:
1170 1176 nbchangesets = int(inpart.params.get('nbchanges'))
1171 1177 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2',
1172 1178 expectedtotal=nbchangesets)
1173 1179 op.records.add('changegroup', {'return': ret})
1174 1180 if op.reply is not None:
1175 1181 # This is definitely not the final form of this
1176 1182 # return. But one need to start somewhere.
1177 1183 part = op.reply.newpart('reply:changegroup', mandatory=False)
1178 1184 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1179 1185 part.addparam('return', '%i' % ret, mandatory=False)
1180 1186 assert not inpart.read()
1181 1187
1182 1188 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1183 1189 ['digest:%s' % k for k in util.DIGESTS.keys()])
1184 1190 @parthandler('remote-changegroup', _remotechangegroupparams)
1185 1191 def handleremotechangegroup(op, inpart):
1186 1192 """apply a bundle10 on the repo, given an url and validation information
1187 1193
1188 1194 All the information about the remote bundle to import are given as
1189 1195 parameters. The parameters include:
1190 1196 - url: the url to the bundle10.
1191 1197 - size: the bundle10 file size. It is used to validate what was
1192 1198 retrieved by the client matches the server knowledge about the bundle.
1193 1199 - digests: a space separated list of the digest types provided as
1194 1200 parameters.
1195 1201 - digest:<digest-type>: the hexadecimal representation of the digest with
1196 1202 that name. Like the size, it is used to validate what was retrieved by
1197 1203 the client matches what the server knows about the bundle.
1198 1204
1199 1205 When multiple digest types are given, all of them are checked.
1200 1206 """
1201 1207 try:
1202 1208 raw_url = inpart.params['url']
1203 1209 except KeyError:
1204 1210 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1205 1211 parsed_url = util.url(raw_url)
1206 1212 if parsed_url.scheme not in capabilities['remote-changegroup']:
1207 1213 raise util.Abort(_('remote-changegroup does not support %s urls') %
1208 1214 parsed_url.scheme)
1209 1215
1210 1216 try:
1211 1217 size = int(inpart.params['size'])
1212 1218 except ValueError:
1213 1219 raise util.Abort(_('remote-changegroup: invalid value for param "%s"')
1214 1220 % 'size')
1215 1221 except KeyError:
1216 1222 raise util.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1217 1223
1218 1224 digests = {}
1219 1225 for typ in inpart.params.get('digests', '').split():
1220 1226 param = 'digest:%s' % typ
1221 1227 try:
1222 1228 value = inpart.params[param]
1223 1229 except KeyError:
1224 1230 raise util.Abort(_('remote-changegroup: missing "%s" param') %
1225 1231 param)
1226 1232 digests[typ] = value
1227 1233
1228 1234 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1229 1235
1230 1236 # Make sure we trigger a transaction creation
1231 1237 #
1232 1238 # The addchangegroup function will get a transaction object by itself, but
1233 1239 # we need to make sure we trigger the creation of a transaction object used
1234 1240 # for the whole processing scope.
1235 1241 op.gettransaction()
1236 import exchange
1242 from . import exchange
1237 1243 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1238 1244 if not isinstance(cg, changegroup.cg1unpacker):
1239 1245 raise util.Abort(_('%s: not a bundle version 1.0') %
1240 1246 util.hidepassword(raw_url))
1241 1247 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
1242 1248 op.records.add('changegroup', {'return': ret})
1243 1249 if op.reply is not None:
1244 1250 # This is definitely not the final form of this
1245 1251 # return. But one need to start somewhere.
1246 1252 part = op.reply.newpart('reply:changegroup')
1247 1253 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1248 1254 part.addparam('return', '%i' % ret, mandatory=False)
1249 1255 try:
1250 1256 real_part.validate()
1251 1257 except util.Abort as e:
1252 1258 raise util.Abort(_('bundle at %s is corrupted:\n%s') %
1253 1259 (util.hidepassword(raw_url), str(e)))
1254 1260 assert not inpart.read()
1255 1261
1256 1262 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1257 1263 def handlereplychangegroup(op, inpart):
1258 1264 ret = int(inpart.params['return'])
1259 1265 replyto = int(inpart.params['in-reply-to'])
1260 1266 op.records.add('changegroup', {'return': ret}, replyto)
1261 1267
1262 1268 @parthandler('check:heads')
1263 1269 def handlecheckheads(op, inpart):
1264 1270 """check that head of the repo did not change
1265 1271
1266 1272 This is used to detect a push race when using unbundle.
1267 1273 This replaces the "heads" argument of unbundle."""
1268 1274 h = inpart.read(20)
1269 1275 heads = []
1270 1276 while len(h) == 20:
1271 1277 heads.append(h)
1272 1278 h = inpart.read(20)
1273 1279 assert not h
1274 1280 if heads != op.repo.heads():
1275 1281 raise error.PushRaced('repository changed while pushing - '
1276 1282 'please try again')
1277 1283
1278 1284 @parthandler('output')
1279 1285 def handleoutput(op, inpart):
1280 1286 """forward output captured on the server to the client"""
1281 1287 for line in inpart.read().splitlines():
1282 1288 op.ui.status(('remote: %s\n' % line))
1283 1289
1284 1290 @parthandler('replycaps')
1285 1291 def handlereplycaps(op, inpart):
1286 1292 """Notify that a reply bundle should be created
1287 1293
1288 1294 The payload contains the capabilities information for the reply"""
1289 1295 caps = decodecaps(inpart.read())
1290 1296 if op.reply is None:
1291 1297 op.reply = bundle20(op.ui, caps)
1292 1298
1293 1299 @parthandler('error:abort', ('message', 'hint'))
1294 1300 def handleerrorabort(op, inpart):
1295 1301 """Used to transmit abort error over the wire"""
1296 1302 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1297 1303
1298 1304 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1299 1305 'in-reply-to'))
1300 1306 def handleerrorpushkey(op, inpart):
1301 1307 """Used to transmit failure of a mandatory pushkey over the wire"""
1302 1308 kwargs = {}
1303 1309 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1304 1310 value = inpart.params.get(name)
1305 1311 if value is not None:
1306 1312 kwargs[name] = value
1307 1313 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1308 1314
1309 1315 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1310 1316 def handleerrorunsupportedcontent(op, inpart):
1311 1317 """Used to transmit unknown content error over the wire"""
1312 1318 kwargs = {}
1313 1319 parttype = inpart.params.get('parttype')
1314 1320 if parttype is not None:
1315 1321 kwargs['parttype'] = parttype
1316 1322 params = inpart.params.get('params')
1317 1323 if params is not None:
1318 1324 kwargs['params'] = params.split('\0')
1319 1325
1320 1326 raise error.UnsupportedPartError(**kwargs)
1321 1327
1322 1328 @parthandler('error:pushraced', ('message',))
1323 1329 def handleerrorpushraced(op, inpart):
1324 1330 """Used to transmit push race error over the wire"""
1325 1331 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1326 1332
1327 1333 @parthandler('listkeys', ('namespace',))
1328 1334 def handlelistkeys(op, inpart):
1329 1335 """retrieve pushkey namespace content stored in a bundle2"""
1330 1336 namespace = inpart.params['namespace']
1331 1337 r = pushkey.decodekeys(inpart.read())
1332 1338 op.records.add('listkeys', (namespace, r))
1333 1339
1334 1340 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1335 1341 def handlepushkey(op, inpart):
1336 1342 """process a pushkey request"""
1337 1343 dec = pushkey.decode
1338 1344 namespace = dec(inpart.params['namespace'])
1339 1345 key = dec(inpart.params['key'])
1340 1346 old = dec(inpart.params['old'])
1341 1347 new = dec(inpart.params['new'])
1342 1348 ret = op.repo.pushkey(namespace, key, old, new)
1343 1349 record = {'namespace': namespace,
1344 1350 'key': key,
1345 1351 'old': old,
1346 1352 'new': new}
1347 1353 op.records.add('pushkey', record)
1348 1354 if op.reply is not None:
1349 1355 rpart = op.reply.newpart('reply:pushkey')
1350 1356 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1351 1357 rpart.addparam('return', '%i' % ret, mandatory=False)
1352 1358 if inpart.mandatory and not ret:
1353 1359 kwargs = {}
1354 1360 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1355 1361 if key in inpart.params:
1356 1362 kwargs[key] = inpart.params[key]
1357 1363 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1358 1364
1359 1365 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1360 1366 def handlepushkeyreply(op, inpart):
1361 1367 """retrieve the result of a pushkey request"""
1362 1368 ret = int(inpart.params['return'])
1363 1369 partid = int(inpart.params['in-reply-to'])
1364 1370 op.records.add('pushkey', {'return': ret}, partid)
1365 1371
1366 1372 @parthandler('obsmarkers')
1367 1373 def handleobsmarker(op, inpart):
1368 1374 """add a stream of obsmarkers to the repo"""
1369 1375 tr = op.gettransaction()
1370 1376 markerdata = inpart.read()
1371 1377 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1372 1378 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1373 1379 % len(markerdata))
1374 1380 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1375 1381 if new:
1376 1382 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1377 1383 op.records.add('obsmarkers', {'new': new})
1378 1384 if op.reply is not None:
1379 1385 rpart = op.reply.newpart('reply:obsmarkers')
1380 1386 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1381 1387 rpart.addparam('new', '%i' % new, mandatory=False)
1382 1388
1383 1389
1384 1390 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1385 1391 def handleobsmarkerreply(op, inpart):
1386 1392 """retrieve the result of a pushkey request"""
1387 1393 ret = int(inpart.params['new'])
1388 1394 partid = int(inpart.params['in-reply-to'])
1389 1395 op.records.add('obsmarkers', {'new': ret}, partid)
1390 1396
1391 1397 @parthandler('hgtagsfnodes')
1392 1398 def handlehgtagsfnodes(op, inpart):
1393 1399 """Applies .hgtags fnodes cache entries to the local repo.
1394 1400
1395 1401 Payload is pairs of 20 byte changeset nodes and filenodes.
1396 1402 """
1397 1403 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1398 1404
1399 1405 count = 0
1400 1406 while True:
1401 1407 node = inpart.read(20)
1402 1408 fnode = inpart.read(20)
1403 1409 if len(node) < 20 or len(fnode) < 20:
1404 1410 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1405 1411 break
1406 1412 cache.setfnode(node, fnode)
1407 1413 count += 1
1408 1414
1409 1415 cache.write()
1410 1416 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now