##// END OF EJS Templates
applybundle: take url as argument...
Pierre-Yves David -
r26795:dff05b3f default
parent child Browse files
Show More
@@ -1,1534 +1,1536 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155 import urllib
156 156
157 157 from .i18n import _
158 158 from . import (
159 159 changegroup,
160 160 error,
161 161 obsolete,
162 162 pushkey,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 _pack = struct.pack
169 169 _unpack = struct.unpack
170 170
171 171 _fstreamparamsize = '>i'
172 172 _fpartheadersize = '>i'
173 173 _fparttypesize = '>B'
174 174 _fpartid = '>I'
175 175 _fpayloadsize = '>i'
176 176 _fpartparamcount = '>BB'
177 177
178 178 preferedchunksize = 4096
179 179
180 180 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
181 181
182 182 def outdebug(ui, message):
183 183 """debug regarding output stream (bundling)"""
184 184 if ui.configbool('devel', 'bundle2.debug', False):
185 185 ui.debug('bundle2-output: %s\n' % message)
186 186
187 187 def indebug(ui, message):
188 188 """debug on input stream (unbundling)"""
189 189 if ui.configbool('devel', 'bundle2.debug', False):
190 190 ui.debug('bundle2-input: %s\n' % message)
191 191
192 192 def validateparttype(parttype):
193 193 """raise ValueError if a parttype contains invalid character"""
194 194 if _parttypeforbidden.search(parttype):
195 195 raise ValueError(parttype)
196 196
197 197 def _makefpartparamsizes(nbparams):
198 198 """return a struct format to read part parameter sizes
199 199
200 200 The number parameters is variable so we need to build that format
201 201 dynamically.
202 202 """
203 203 return '>'+('BB'*nbparams)
204 204
205 205 parthandlermapping = {}
206 206
207 207 def parthandler(parttype, params=()):
208 208 """decorator that register a function as a bundle2 part handler
209 209
210 210 eg::
211 211
212 212 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
213 213 def myparttypehandler(...):
214 214 '''process a part of type "my part".'''
215 215 ...
216 216 """
217 217 validateparttype(parttype)
218 218 def _decorator(func):
219 219 lparttype = parttype.lower() # enforce lower case matching.
220 220 assert lparttype not in parthandlermapping
221 221 parthandlermapping[lparttype] = func
222 222 func.params = frozenset(params)
223 223 return func
224 224 return _decorator
225 225
226 226 class unbundlerecords(object):
227 227 """keep record of what happens during and unbundle
228 228
229 229 New records are added using `records.add('cat', obj)`. Where 'cat' is a
230 230 category of record and obj is an arbitrary object.
231 231
232 232 `records['cat']` will return all entries of this category 'cat'.
233 233
234 234 Iterating on the object itself will yield `('category', obj)` tuples
235 235 for all entries.
236 236
237 237 All iterations happens in chronological order.
238 238 """
239 239
240 240 def __init__(self):
241 241 self._categories = {}
242 242 self._sequences = []
243 243 self._replies = {}
244 244
245 245 def add(self, category, entry, inreplyto=None):
246 246 """add a new record of a given category.
247 247
248 248 The entry can then be retrieved in the list returned by
249 249 self['category']."""
250 250 self._categories.setdefault(category, []).append(entry)
251 251 self._sequences.append((category, entry))
252 252 if inreplyto is not None:
253 253 self.getreplies(inreplyto).add(category, entry)
254 254
255 255 def getreplies(self, partid):
256 256 """get the records that are replies to a specific part"""
257 257 return self._replies.setdefault(partid, unbundlerecords())
258 258
259 259 def __getitem__(self, cat):
260 260 return tuple(self._categories.get(cat, ()))
261 261
262 262 def __iter__(self):
263 263 return iter(self._sequences)
264 264
265 265 def __len__(self):
266 266 return len(self._sequences)
267 267
268 268 def __nonzero__(self):
269 269 return bool(self._sequences)
270 270
271 271 class bundleoperation(object):
272 272 """an object that represents a single bundling process
273 273
274 274 Its purpose is to carry unbundle-related objects and states.
275 275
276 276 A new object should be created at the beginning of each bundle processing.
277 277 The object is to be returned by the processing function.
278 278
279 279 The object has very little content now it will ultimately contain:
280 280 * an access to the repo the bundle is applied to,
281 281 * a ui object,
282 282 * a way to retrieve a transaction to add changes to the repo,
283 283 * a way to record the result of processing each part,
284 284 * a way to construct a bundle response when applicable.
285 285 """
286 286
287 287 def __init__(self, repo, transactiongetter, captureoutput=True):
288 288 self.repo = repo
289 289 self.ui = repo.ui
290 290 self.records = unbundlerecords()
291 291 self.gettransaction = transactiongetter
292 292 self.reply = None
293 293 self.captureoutput = captureoutput
294 294
295 295 class TransactionUnavailable(RuntimeError):
296 296 pass
297 297
298 298 def _notransaction():
299 299 """default method to get a transaction while processing a bundle
300 300
301 301 Raise an exception to highlight the fact that no transaction was expected
302 302 to be created"""
303 303 raise TransactionUnavailable()
304 304
305 def applybundle(repo, unbundler, tr, source=None, op=None):
305 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
306 306 # transform me into unbundler.apply() as soon as the freeze is lifted
307 307 tr.hookargs['bundle2'] = '1'
308 308 if source is not None and 'source' not in tr.hookargs:
309 309 tr.hookargs['source'] = source
310 if url is not None and 'url' not in tr.hookargs:
311 tr.hookargs['url'] = url
310 312 return processbundle(repo, unbundler, lambda: tr, op=op)
311 313
312 314 def processbundle(repo, unbundler, transactiongetter=None, op=None):
313 315 """This function process a bundle, apply effect to/from a repo
314 316
315 317 It iterates over each part then searches for and uses the proper handling
316 318 code to process the part. Parts are processed in order.
317 319
318 320 This is very early version of this function that will be strongly reworked
319 321 before final usage.
320 322
321 323 Unknown Mandatory part will abort the process.
322 324
323 325 It is temporarily possible to provide a prebuilt bundleoperation to the
324 326 function. This is used to ensure output is properly propagated in case of
325 327 an error during the unbundling. This output capturing part will likely be
326 328 reworked and this ability will probably go away in the process.
327 329 """
328 330 if op is None:
329 331 if transactiongetter is None:
330 332 transactiongetter = _notransaction
331 333 op = bundleoperation(repo, transactiongetter)
332 334 # todo:
333 335 # - replace this is a init function soon.
334 336 # - exception catching
335 337 unbundler.params
336 338 if repo.ui.debugflag:
337 339 msg = ['bundle2-input-bundle:']
338 340 if unbundler.params:
339 341 msg.append(' %i params')
340 342 if op.gettransaction is None:
341 343 msg.append(' no-transaction')
342 344 else:
343 345 msg.append(' with-transaction')
344 346 msg.append('\n')
345 347 repo.ui.debug(''.join(msg))
346 348 iterparts = enumerate(unbundler.iterparts())
347 349 part = None
348 350 nbpart = 0
349 351 try:
350 352 for nbpart, part in iterparts:
351 353 _processpart(op, part)
352 354 except BaseException as exc:
353 355 for nbpart, part in iterparts:
354 356 # consume the bundle content
355 357 part.seek(0, 2)
356 358 # Small hack to let caller code distinguish exceptions from bundle2
357 359 # processing from processing the old format. This is mostly
358 360 # needed to handle different return codes to unbundle according to the
359 361 # type of bundle. We should probably clean up or drop this return code
360 362 # craziness in a future version.
361 363 exc.duringunbundle2 = True
362 364 salvaged = []
363 365 replycaps = None
364 366 if op.reply is not None:
365 367 salvaged = op.reply.salvageoutput()
366 368 replycaps = op.reply.capabilities
367 369 exc._replycaps = replycaps
368 370 exc._bundle2salvagedoutput = salvaged
369 371 raise
370 372 finally:
371 373 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
372 374
373 375 return op
374 376
375 377 def _processpart(op, part):
376 378 """process a single part from a bundle
377 379
378 380 The part is guaranteed to have been fully consumed when the function exits
379 381 (even if an exception is raised)."""
380 382 status = 'unknown' # used by debug output
381 383 try:
382 384 try:
383 385 handler = parthandlermapping.get(part.type)
384 386 if handler is None:
385 387 status = 'unsupported-type'
386 388 raise error.BundleUnknownFeatureError(parttype=part.type)
387 389 indebug(op.ui, 'found a handler for part %r' % part.type)
388 390 unknownparams = part.mandatorykeys - handler.params
389 391 if unknownparams:
390 392 unknownparams = list(unknownparams)
391 393 unknownparams.sort()
392 394 status = 'unsupported-params (%s)' % unknownparams
393 395 raise error.BundleUnknownFeatureError(parttype=part.type,
394 396 params=unknownparams)
395 397 status = 'supported'
396 398 except error.BundleUnknownFeatureError as exc:
397 399 if part.mandatory: # mandatory parts
398 400 raise
399 401 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
400 402 return # skip to part processing
401 403 finally:
402 404 if op.ui.debugflag:
403 405 msg = ['bundle2-input-part: "%s"' % part.type]
404 406 if not part.mandatory:
405 407 msg.append(' (advisory)')
406 408 nbmp = len(part.mandatorykeys)
407 409 nbap = len(part.params) - nbmp
408 410 if nbmp or nbap:
409 411 msg.append(' (params:')
410 412 if nbmp:
411 413 msg.append(' %i mandatory' % nbmp)
412 414 if nbap:
413 415 msg.append(' %i advisory' % nbmp)
414 416 msg.append(')')
415 417 msg.append(' %s\n' % status)
416 418 op.ui.debug(''.join(msg))
417 419
418 420 # handler is called outside the above try block so that we don't
419 421 # risk catching KeyErrors from anything other than the
420 422 # parthandlermapping lookup (any KeyError raised by handler()
421 423 # itself represents a defect of a different variety).
422 424 output = None
423 425 if op.captureoutput and op.reply is not None:
424 426 op.ui.pushbuffer(error=True, subproc=True)
425 427 output = ''
426 428 try:
427 429 handler(op, part)
428 430 finally:
429 431 if output is not None:
430 432 output = op.ui.popbuffer()
431 433 if output:
432 434 outpart = op.reply.newpart('output', data=output,
433 435 mandatory=False)
434 436 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
435 437 finally:
436 438 # consume the part content to not corrupt the stream.
437 439 part.seek(0, 2)
438 440
439 441
440 442 def decodecaps(blob):
441 443 """decode a bundle2 caps bytes blob into a dictionary
442 444
443 445 The blob is a list of capabilities (one per line)
444 446 Capabilities may have values using a line of the form::
445 447
446 448 capability=value1,value2,value3
447 449
448 450 The values are always a list."""
449 451 caps = {}
450 452 for line in blob.splitlines():
451 453 if not line:
452 454 continue
453 455 if '=' not in line:
454 456 key, vals = line, ()
455 457 else:
456 458 key, vals = line.split('=', 1)
457 459 vals = vals.split(',')
458 460 key = urllib.unquote(key)
459 461 vals = [urllib.unquote(v) for v in vals]
460 462 caps[key] = vals
461 463 return caps
462 464
463 465 def encodecaps(caps):
464 466 """encode a bundle2 caps dictionary into a bytes blob"""
465 467 chunks = []
466 468 for ca in sorted(caps):
467 469 vals = caps[ca]
468 470 ca = urllib.quote(ca)
469 471 vals = [urllib.quote(v) for v in vals]
470 472 if vals:
471 473 ca = "%s=%s" % (ca, ','.join(vals))
472 474 chunks.append(ca)
473 475 return '\n'.join(chunks)
474 476
475 477 class bundle20(object):
476 478 """represent an outgoing bundle2 container
477 479
478 480 Use the `addparam` method to add stream level parameter. and `newpart` to
479 481 populate it. Then call `getchunks` to retrieve all the binary chunks of
480 482 data that compose the bundle2 container."""
481 483
482 484 _magicstring = 'HG20'
483 485
484 486 def __init__(self, ui, capabilities=()):
485 487 self.ui = ui
486 488 self._params = []
487 489 self._parts = []
488 490 self.capabilities = dict(capabilities)
489 491 self._compressor = util.compressors[None]()
490 492
491 493 def setcompression(self, alg):
492 494 """setup core part compression to <alg>"""
493 495 if alg is None:
494 496 return
495 497 assert not any(n.lower() == 'Compression' for n, v in self._params)
496 498 self.addparam('Compression', alg)
497 499 self._compressor = util.compressors[alg]()
498 500
499 501 @property
500 502 def nbparts(self):
501 503 """total number of parts added to the bundler"""
502 504 return len(self._parts)
503 505
504 506 # methods used to defines the bundle2 content
505 507 def addparam(self, name, value=None):
506 508 """add a stream level parameter"""
507 509 if not name:
508 510 raise ValueError('empty parameter name')
509 511 if name[0] not in string.letters:
510 512 raise ValueError('non letter first character: %r' % name)
511 513 self._params.append((name, value))
512 514
513 515 def addpart(self, part):
514 516 """add a new part to the bundle2 container
515 517
516 518 Parts contains the actual applicative payload."""
517 519 assert part.id is None
518 520 part.id = len(self._parts) # very cheap counter
519 521 self._parts.append(part)
520 522
521 523 def newpart(self, typeid, *args, **kwargs):
522 524 """create a new part and add it to the containers
523 525
524 526 As the part is directly added to the containers. For now, this means
525 527 that any failure to properly initialize the part after calling
526 528 ``newpart`` should result in a failure of the whole bundling process.
527 529
528 530 You can still fall back to manually create and add if you need better
529 531 control."""
530 532 part = bundlepart(typeid, *args, **kwargs)
531 533 self.addpart(part)
532 534 return part
533 535
534 536 # methods used to generate the bundle2 stream
535 537 def getchunks(self):
536 538 if self.ui.debugflag:
537 539 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
538 540 if self._params:
539 541 msg.append(' (%i params)' % len(self._params))
540 542 msg.append(' %i parts total\n' % len(self._parts))
541 543 self.ui.debug(''.join(msg))
542 544 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
543 545 yield self._magicstring
544 546 param = self._paramchunk()
545 547 outdebug(self.ui, 'bundle parameter: %s' % param)
546 548 yield _pack(_fstreamparamsize, len(param))
547 549 if param:
548 550 yield param
549 551 # starting compression
550 552 for chunk in self._getcorechunk():
551 553 yield self._compressor.compress(chunk)
552 554 yield self._compressor.flush()
553 555
554 556 def _paramchunk(self):
555 557 """return a encoded version of all stream parameters"""
556 558 blocks = []
557 559 for par, value in self._params:
558 560 par = urllib.quote(par)
559 561 if value is not None:
560 562 value = urllib.quote(value)
561 563 par = '%s=%s' % (par, value)
562 564 blocks.append(par)
563 565 return ' '.join(blocks)
564 566
565 567 def _getcorechunk(self):
566 568 """yield chunk for the core part of the bundle
567 569
568 570 (all but headers and parameters)"""
569 571 outdebug(self.ui, 'start of parts')
570 572 for part in self._parts:
571 573 outdebug(self.ui, 'bundle part: "%s"' % part.type)
572 574 for chunk in part.getchunks(ui=self.ui):
573 575 yield chunk
574 576 outdebug(self.ui, 'end of bundle')
575 577 yield _pack(_fpartheadersize, 0)
576 578
577 579
578 580 def salvageoutput(self):
579 581 """return a list with a copy of all output parts in the bundle
580 582
581 583 This is meant to be used during error handling to make sure we preserve
582 584 server output"""
583 585 salvaged = []
584 586 for part in self._parts:
585 587 if part.type.startswith('output'):
586 588 salvaged.append(part.copy())
587 589 return salvaged
588 590
589 591
590 592 class unpackermixin(object):
591 593 """A mixin to extract bytes and struct data from a stream"""
592 594
593 595 def __init__(self, fp):
594 596 self._fp = fp
595 597 self._seekable = (util.safehasattr(fp, 'seek') and
596 598 util.safehasattr(fp, 'tell'))
597 599
598 600 def _unpack(self, format):
599 601 """unpack this struct format from the stream"""
600 602 data = self._readexact(struct.calcsize(format))
601 603 return _unpack(format, data)
602 604
603 605 def _readexact(self, size):
604 606 """read exactly <size> bytes from the stream"""
605 607 return changegroup.readexactly(self._fp, size)
606 608
607 609 def seek(self, offset, whence=0):
608 610 """move the underlying file pointer"""
609 611 if self._seekable:
610 612 return self._fp.seek(offset, whence)
611 613 else:
612 614 raise NotImplementedError(_('File pointer is not seekable'))
613 615
614 616 def tell(self):
615 617 """return the file offset, or None if file is not seekable"""
616 618 if self._seekable:
617 619 try:
618 620 return self._fp.tell()
619 621 except IOError as e:
620 622 if e.errno == errno.ESPIPE:
621 623 self._seekable = False
622 624 else:
623 625 raise
624 626 return None
625 627
626 628 def close(self):
627 629 """close underlying file"""
628 630 if util.safehasattr(self._fp, 'close'):
629 631 return self._fp.close()
630 632
631 633 def getunbundler(ui, fp, magicstring=None):
632 634 """return a valid unbundler object for a given magicstring"""
633 635 if magicstring is None:
634 636 magicstring = changegroup.readexactly(fp, 4)
635 637 magic, version = magicstring[0:2], magicstring[2:4]
636 638 if magic != 'HG':
637 639 raise error.Abort(_('not a Mercurial bundle'))
638 640 unbundlerclass = formatmap.get(version)
639 641 if unbundlerclass is None:
640 642 raise error.Abort(_('unknown bundle version %s') % version)
641 643 unbundler = unbundlerclass(ui, fp)
642 644 indebug(ui, 'start processing of %s stream' % magicstring)
643 645 return unbundler
644 646
645 647 class unbundle20(unpackermixin):
646 648 """interpret a bundle2 stream
647 649
648 650 This class is fed with a binary stream and yields parts through its
649 651 `iterparts` methods."""
650 652
651 653 _magicstring = 'HG20'
652 654
653 655 def __init__(self, ui, fp):
654 656 """If header is specified, we do not read it out of the stream."""
655 657 self.ui = ui
656 658 self._decompressor = util.decompressors[None]
657 659 super(unbundle20, self).__init__(fp)
658 660
659 661 @util.propertycache
660 662 def params(self):
661 663 """dictionary of stream level parameters"""
662 664 indebug(self.ui, 'reading bundle2 stream parameters')
663 665 params = {}
664 666 paramssize = self._unpack(_fstreamparamsize)[0]
665 667 if paramssize < 0:
666 668 raise error.BundleValueError('negative bundle param size: %i'
667 669 % paramssize)
668 670 if paramssize:
669 671 params = self._readexact(paramssize)
670 672 params = self._processallparams(params)
671 673 return params
672 674
673 675 def _processallparams(self, paramsblock):
674 676 """"""
675 677 params = {}
676 678 for p in paramsblock.split(' '):
677 679 p = p.split('=', 1)
678 680 p = [urllib.unquote(i) for i in p]
679 681 if len(p) < 2:
680 682 p.append(None)
681 683 self._processparam(*p)
682 684 params[p[0]] = p[1]
683 685 return params
684 686
685 687
686 688 def _processparam(self, name, value):
687 689 """process a parameter, applying its effect if needed
688 690
689 691 Parameter starting with a lower case letter are advisory and will be
690 692 ignored when unknown. Those starting with an upper case letter are
691 693 mandatory and will this function will raise a KeyError when unknown.
692 694
693 695 Note: no option are currently supported. Any input will be either
694 696 ignored or failing.
695 697 """
696 698 if not name:
697 699 raise ValueError('empty parameter name')
698 700 if name[0] not in string.letters:
699 701 raise ValueError('non letter first character: %r' % name)
700 702 try:
701 703 handler = b2streamparamsmap[name.lower()]
702 704 except KeyError:
703 705 if name[0].islower():
704 706 indebug(self.ui, "ignoring unknown parameter %r" % name)
705 707 else:
706 708 raise error.BundleUnknownFeatureError(params=(name,))
707 709 else:
708 710 handler(self, name, value)
709 711
710 712 def _forwardchunks(self):
711 713 """utility to transfer a bundle2 as binary
712 714
713 715 This is made necessary by the fact the 'getbundle' command over 'ssh'
714 716 have no way to know then the reply end, relying on the bundle to be
715 717 interpreted to know its end. This is terrible and we are sorry, but we
716 718 needed to move forward to get general delta enabled.
717 719 """
718 720 yield self._magicstring
719 721 assert 'params' not in vars(self)
720 722 paramssize = self._unpack(_fstreamparamsize)[0]
721 723 if paramssize < 0:
722 724 raise error.BundleValueError('negative bundle param size: %i'
723 725 % paramssize)
724 726 yield _pack(_fstreamparamsize, paramssize)
725 727 if paramssize:
726 728 params = self._readexact(paramssize)
727 729 self._processallparams(params)
728 730 yield params
729 731 assert self._decompressor is util.decompressors[None]
730 732 # From there, payload might need to be decompressed
731 733 self._fp = self._decompressor(self._fp)
732 734 emptycount = 0
733 735 while emptycount < 2:
734 736 # so we can brainlessly loop
735 737 assert _fpartheadersize == _fpayloadsize
736 738 size = self._unpack(_fpartheadersize)[0]
737 739 yield _pack(_fpartheadersize, size)
738 740 if size:
739 741 emptycount = 0
740 742 else:
741 743 emptycount += 1
742 744 continue
743 745 if size == flaginterrupt:
744 746 continue
745 747 elif size < 0:
746 748 raise error.BundleValueError('negative chunk size: %i')
747 749 yield self._readexact(size)
748 750
749 751
750 752 def iterparts(self):
751 753 """yield all parts contained in the stream"""
752 754 # make sure param have been loaded
753 755 self.params
754 756 # From there, payload need to be decompressed
755 757 self._fp = self._decompressor(self._fp)
756 758 indebug(self.ui, 'start extraction of bundle2 parts')
757 759 headerblock = self._readpartheader()
758 760 while headerblock is not None:
759 761 part = unbundlepart(self.ui, headerblock, self._fp)
760 762 yield part
761 763 part.seek(0, 2)
762 764 headerblock = self._readpartheader()
763 765 indebug(self.ui, 'end of bundle2 stream')
764 766
765 767 def _readpartheader(self):
766 768 """reads a part header size and return the bytes blob
767 769
768 770 returns None if empty"""
769 771 headersize = self._unpack(_fpartheadersize)[0]
770 772 if headersize < 0:
771 773 raise error.BundleValueError('negative part header size: %i'
772 774 % headersize)
773 775 indebug(self.ui, 'part header size: %i' % headersize)
774 776 if headersize:
775 777 return self._readexact(headersize)
776 778 return None
777 779
778 780 def compressed(self):
779 781 return False
780 782
781 783 formatmap = {'20': unbundle20}
782 784
783 785 b2streamparamsmap = {}
784 786
785 787 def b2streamparamhandler(name):
786 788 """register a handler for a stream level parameter"""
787 789 def decorator(func):
788 790 assert name not in formatmap
789 791 b2streamparamsmap[name] = func
790 792 return func
791 793 return decorator
792 794
793 795 @b2streamparamhandler('compression')
794 796 def processcompression(unbundler, param, value):
795 797 """read compression parameter and install payload decompression"""
796 798 if value not in util.decompressors:
797 799 raise error.BundleUnknownFeatureError(params=(param,),
798 800 values=(value,))
799 801 unbundler._decompressor = util.decompressors[value]
800 802
801 803 class bundlepart(object):
802 804 """A bundle2 part contains application level payload
803 805
804 806 The part `type` is used to route the part to the application level
805 807 handler.
806 808
807 809 The part payload is contained in ``part.data``. It could be raw bytes or a
808 810 generator of byte chunks.
809 811
810 812 You can add parameters to the part using the ``addparam`` method.
811 813 Parameters can be either mandatory (default) or advisory. Remote side
812 814 should be able to safely ignore the advisory ones.
813 815
814 816 Both data and parameters cannot be modified after the generation has begun.
815 817 """
816 818
817 819 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
818 820 data='', mandatory=True):
819 821 validateparttype(parttype)
820 822 self.id = None
821 823 self.type = parttype
822 824 self._data = data
823 825 self._mandatoryparams = list(mandatoryparams)
824 826 self._advisoryparams = list(advisoryparams)
825 827 # checking for duplicated entries
826 828 self._seenparams = set()
827 829 for pname, __ in self._mandatoryparams + self._advisoryparams:
828 830 if pname in self._seenparams:
829 831 raise RuntimeError('duplicated params: %s' % pname)
830 832 self._seenparams.add(pname)
831 833 # status of the part's generation:
832 834 # - None: not started,
833 835 # - False: currently generated,
834 836 # - True: generation done.
835 837 self._generated = None
836 838 self.mandatory = mandatory
837 839
838 840 def copy(self):
839 841 """return a copy of the part
840 842
841 843 The new part have the very same content but no partid assigned yet.
842 844 Parts with generated data cannot be copied."""
843 845 assert not util.safehasattr(self.data, 'next')
844 846 return self.__class__(self.type, self._mandatoryparams,
845 847 self._advisoryparams, self._data, self.mandatory)
846 848
847 849 # methods used to defines the part content
848 850 def __setdata(self, data):
849 851 if self._generated is not None:
850 852 raise error.ReadOnlyPartError('part is being generated')
851 853 self._data = data
852 854 def __getdata(self):
853 855 return self._data
854 856 data = property(__getdata, __setdata)
855 857
856 858 @property
857 859 def mandatoryparams(self):
858 860 # make it an immutable tuple to force people through ``addparam``
859 861 return tuple(self._mandatoryparams)
860 862
861 863 @property
862 864 def advisoryparams(self):
863 865 # make it an immutable tuple to force people through ``addparam``
864 866 return tuple(self._advisoryparams)
865 867
866 868 def addparam(self, name, value='', mandatory=True):
867 869 if self._generated is not None:
868 870 raise error.ReadOnlyPartError('part is being generated')
869 871 if name in self._seenparams:
870 872 raise ValueError('duplicated params: %s' % name)
871 873 self._seenparams.add(name)
872 874 params = self._advisoryparams
873 875 if mandatory:
874 876 params = self._mandatoryparams
875 877 params.append((name, value))
876 878
877 879 # methods used to generates the bundle2 stream
878 880 def getchunks(self, ui):
879 881 if self._generated is not None:
880 882 raise RuntimeError('part can only be consumed once')
881 883 self._generated = False
882 884
883 885 if ui.debugflag:
884 886 msg = ['bundle2-output-part: "%s"' % self.type]
885 887 if not self.mandatory:
886 888 msg.append(' (advisory)')
887 889 nbmp = len(self.mandatoryparams)
888 890 nbap = len(self.advisoryparams)
889 891 if nbmp or nbap:
890 892 msg.append(' (params:')
891 893 if nbmp:
892 894 msg.append(' %i mandatory' % nbmp)
893 895 if nbap:
894 896 msg.append(' %i advisory' % nbmp)
895 897 msg.append(')')
896 898 if not self.data:
897 899 msg.append(' empty payload')
898 900 elif util.safehasattr(self.data, 'next'):
899 901 msg.append(' streamed payload')
900 902 else:
901 903 msg.append(' %i bytes payload' % len(self.data))
902 904 msg.append('\n')
903 905 ui.debug(''.join(msg))
904 906
905 907 #### header
906 908 if self.mandatory:
907 909 parttype = self.type.upper()
908 910 else:
909 911 parttype = self.type.lower()
910 912 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
911 913 ## parttype
912 914 header = [_pack(_fparttypesize, len(parttype)),
913 915 parttype, _pack(_fpartid, self.id),
914 916 ]
915 917 ## parameters
916 918 # count
917 919 manpar = self.mandatoryparams
918 920 advpar = self.advisoryparams
919 921 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
920 922 # size
921 923 parsizes = []
922 924 for key, value in manpar:
923 925 parsizes.append(len(key))
924 926 parsizes.append(len(value))
925 927 for key, value in advpar:
926 928 parsizes.append(len(key))
927 929 parsizes.append(len(value))
928 930 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
929 931 header.append(paramsizes)
930 932 # key, value
931 933 for key, value in manpar:
932 934 header.append(key)
933 935 header.append(value)
934 936 for key, value in advpar:
935 937 header.append(key)
936 938 header.append(value)
937 939 ## finalize header
938 940 headerchunk = ''.join(header)
939 941 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
940 942 yield _pack(_fpartheadersize, len(headerchunk))
941 943 yield headerchunk
942 944 ## payload
943 945 try:
944 946 for chunk in self._payloadchunks():
945 947 outdebug(ui, 'payload chunk size: %i' % len(chunk))
946 948 yield _pack(_fpayloadsize, len(chunk))
947 949 yield chunk
948 950 except GeneratorExit:
949 951 # GeneratorExit means that nobody is listening for our
950 952 # results anyway, so just bail quickly rather than trying
951 953 # to produce an error part.
952 954 ui.debug('bundle2-generatorexit\n')
953 955 raise
954 956 except BaseException as exc:
955 957 # backup exception data for later
956 958 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
957 959 % exc)
958 960 exc_info = sys.exc_info()
959 961 msg = 'unexpected error: %s' % exc
960 962 interpart = bundlepart('error:abort', [('message', msg)],
961 963 mandatory=False)
962 964 interpart.id = 0
963 965 yield _pack(_fpayloadsize, -1)
964 966 for chunk in interpart.getchunks(ui=ui):
965 967 yield chunk
966 968 outdebug(ui, 'closing payload chunk')
967 969 # abort current part payload
968 970 yield _pack(_fpayloadsize, 0)
969 971 raise exc_info[0], exc_info[1], exc_info[2]
970 972 # end of payload
971 973 outdebug(ui, 'closing payload chunk')
972 974 yield _pack(_fpayloadsize, 0)
973 975 self._generated = True
974 976
975 977 def _payloadchunks(self):
976 978 """yield chunks of a the part payload
977 979
978 980 Exists to handle the different methods to provide data to a part."""
979 981 # we only support fixed size data now.
980 982 # This will be improved in the future.
981 983 if util.safehasattr(self.data, 'next'):
982 984 buff = util.chunkbuffer(self.data)
983 985 chunk = buff.read(preferedchunksize)
984 986 while chunk:
985 987 yield chunk
986 988 chunk = buff.read(preferedchunksize)
987 989 elif len(self.data):
988 990 yield self.data
989 991
990 992
991 993 flaginterrupt = -1
992 994
993 995 class interrupthandler(unpackermixin):
994 996 """read one part and process it with restricted capability
995 997
996 998 This allows to transmit exception raised on the producer size during part
997 999 iteration while the consumer is reading a part.
998 1000
999 1001 Part processed in this manner only have access to a ui object,"""
1000 1002
1001 1003 def __init__(self, ui, fp):
1002 1004 super(interrupthandler, self).__init__(fp)
1003 1005 self.ui = ui
1004 1006
1005 1007 def _readpartheader(self):
1006 1008 """reads a part header size and return the bytes blob
1007 1009
1008 1010 returns None if empty"""
1009 1011 headersize = self._unpack(_fpartheadersize)[0]
1010 1012 if headersize < 0:
1011 1013 raise error.BundleValueError('negative part header size: %i'
1012 1014 % headersize)
1013 1015 indebug(self.ui, 'part header size: %i\n' % headersize)
1014 1016 if headersize:
1015 1017 return self._readexact(headersize)
1016 1018 return None
1017 1019
1018 1020 def __call__(self):
1019 1021
1020 1022 self.ui.debug('bundle2-input-stream-interrupt:'
1021 1023 ' opening out of band context\n')
1022 1024 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1023 1025 headerblock = self._readpartheader()
1024 1026 if headerblock is None:
1025 1027 indebug(self.ui, 'no part found during interruption.')
1026 1028 return
1027 1029 part = unbundlepart(self.ui, headerblock, self._fp)
1028 1030 op = interruptoperation(self.ui)
1029 1031 _processpart(op, part)
1030 1032 self.ui.debug('bundle2-input-stream-interrupt:'
1031 1033 ' closing out of band context\n')
1032 1034
1033 1035 class interruptoperation(object):
1034 1036 """A limited operation to be use by part handler during interruption
1035 1037
1036 1038 It only have access to an ui object.
1037 1039 """
1038 1040
1039 1041 def __init__(self, ui):
1040 1042 self.ui = ui
1041 1043 self.reply = None
1042 1044 self.captureoutput = False
1043 1045
1044 1046 @property
1045 1047 def repo(self):
1046 1048 raise RuntimeError('no repo access from stream interruption')
1047 1049
1048 1050 def gettransaction(self):
1049 1051 raise TransactionUnavailable('no repo access from stream interruption')
1050 1052
1051 1053 class unbundlepart(unpackermixin):
1052 1054 """a bundle part read from a bundle"""
1053 1055
1054 1056 def __init__(self, ui, header, fp):
1055 1057 super(unbundlepart, self).__init__(fp)
1056 1058 self.ui = ui
1057 1059 # unbundle state attr
1058 1060 self._headerdata = header
1059 1061 self._headeroffset = 0
1060 1062 self._initialized = False
1061 1063 self.consumed = False
1062 1064 # part data
1063 1065 self.id = None
1064 1066 self.type = None
1065 1067 self.mandatoryparams = None
1066 1068 self.advisoryparams = None
1067 1069 self.params = None
1068 1070 self.mandatorykeys = ()
1069 1071 self._payloadstream = None
1070 1072 self._readheader()
1071 1073 self._mandatory = None
1072 1074 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1073 1075 self._pos = 0
1074 1076
1075 1077 def _fromheader(self, size):
1076 1078 """return the next <size> byte from the header"""
1077 1079 offset = self._headeroffset
1078 1080 data = self._headerdata[offset:(offset + size)]
1079 1081 self._headeroffset = offset + size
1080 1082 return data
1081 1083
1082 1084 def _unpackheader(self, format):
1083 1085 """read given format from header
1084 1086
1085 1087 This automatically compute the size of the format to read."""
1086 1088 data = self._fromheader(struct.calcsize(format))
1087 1089 return _unpack(format, data)
1088 1090
1089 1091 def _initparams(self, mandatoryparams, advisoryparams):
1090 1092 """internal function to setup all logic related parameters"""
1091 1093 # make it read only to prevent people touching it by mistake.
1092 1094 self.mandatoryparams = tuple(mandatoryparams)
1093 1095 self.advisoryparams = tuple(advisoryparams)
1094 1096 # user friendly UI
1095 1097 self.params = dict(self.mandatoryparams)
1096 1098 self.params.update(dict(self.advisoryparams))
1097 1099 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1098 1100
1099 1101 def _payloadchunks(self, chunknum=0):
1100 1102 '''seek to specified chunk and start yielding data'''
1101 1103 if len(self._chunkindex) == 0:
1102 1104 assert chunknum == 0, 'Must start with chunk 0'
1103 1105 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1104 1106 else:
1105 1107 assert chunknum < len(self._chunkindex), \
1106 1108 'Unknown chunk %d' % chunknum
1107 1109 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1108 1110
1109 1111 pos = self._chunkindex[chunknum][0]
1110 1112 payloadsize = self._unpack(_fpayloadsize)[0]
1111 1113 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1112 1114 while payloadsize:
1113 1115 if payloadsize == flaginterrupt:
1114 1116 # interruption detection, the handler will now read a
1115 1117 # single part and process it.
1116 1118 interrupthandler(self.ui, self._fp)()
1117 1119 elif payloadsize < 0:
1118 1120 msg = 'negative payload chunk size: %i' % payloadsize
1119 1121 raise error.BundleValueError(msg)
1120 1122 else:
1121 1123 result = self._readexact(payloadsize)
1122 1124 chunknum += 1
1123 1125 pos += payloadsize
1124 1126 if chunknum == len(self._chunkindex):
1125 1127 self._chunkindex.append((pos,
1126 1128 super(unbundlepart, self).tell()))
1127 1129 yield result
1128 1130 payloadsize = self._unpack(_fpayloadsize)[0]
1129 1131 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1130 1132
1131 1133 def _findchunk(self, pos):
1132 1134 '''for a given payload position, return a chunk number and offset'''
1133 1135 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1134 1136 if ppos == pos:
1135 1137 return chunk, 0
1136 1138 elif ppos > pos:
1137 1139 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1138 1140 raise ValueError('Unknown chunk')
1139 1141
1140 1142 def _readheader(self):
1141 1143 """read the header and setup the object"""
1142 1144 typesize = self._unpackheader(_fparttypesize)[0]
1143 1145 self.type = self._fromheader(typesize)
1144 1146 indebug(self.ui, 'part type: "%s"' % self.type)
1145 1147 self.id = self._unpackheader(_fpartid)[0]
1146 1148 indebug(self.ui, 'part id: "%s"' % self.id)
1147 1149 # extract mandatory bit from type
1148 1150 self.mandatory = (self.type != self.type.lower())
1149 1151 self.type = self.type.lower()
1150 1152 ## reading parameters
1151 1153 # param count
1152 1154 mancount, advcount = self._unpackheader(_fpartparamcount)
1153 1155 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1154 1156 # param size
1155 1157 fparamsizes = _makefpartparamsizes(mancount + advcount)
1156 1158 paramsizes = self._unpackheader(fparamsizes)
1157 1159 # make it a list of couple again
1158 1160 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1159 1161 # split mandatory from advisory
1160 1162 mansizes = paramsizes[:mancount]
1161 1163 advsizes = paramsizes[mancount:]
1162 1164 # retrieve param value
1163 1165 manparams = []
1164 1166 for key, value in mansizes:
1165 1167 manparams.append((self._fromheader(key), self._fromheader(value)))
1166 1168 advparams = []
1167 1169 for key, value in advsizes:
1168 1170 advparams.append((self._fromheader(key), self._fromheader(value)))
1169 1171 self._initparams(manparams, advparams)
1170 1172 ## part payload
1171 1173 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1172 1174 # we read the data, tell it
1173 1175 self._initialized = True
1174 1176
1175 1177 def read(self, size=None):
1176 1178 """read payload data"""
1177 1179 if not self._initialized:
1178 1180 self._readheader()
1179 1181 if size is None:
1180 1182 data = self._payloadstream.read()
1181 1183 else:
1182 1184 data = self._payloadstream.read(size)
1183 1185 self._pos += len(data)
1184 1186 if size is None or len(data) < size:
1185 1187 if not self.consumed and self._pos:
1186 1188 self.ui.debug('bundle2-input-part: total payload size %i\n'
1187 1189 % self._pos)
1188 1190 self.consumed = True
1189 1191 return data
1190 1192
1191 1193 def tell(self):
1192 1194 return self._pos
1193 1195
1194 1196 def seek(self, offset, whence=0):
1195 1197 if whence == 0:
1196 1198 newpos = offset
1197 1199 elif whence == 1:
1198 1200 newpos = self._pos + offset
1199 1201 elif whence == 2:
1200 1202 if not self.consumed:
1201 1203 self.read()
1202 1204 newpos = self._chunkindex[-1][0] - offset
1203 1205 else:
1204 1206 raise ValueError('Unknown whence value: %r' % (whence,))
1205 1207
1206 1208 if newpos > self._chunkindex[-1][0] and not self.consumed:
1207 1209 self.read()
1208 1210 if not 0 <= newpos <= self._chunkindex[-1][0]:
1209 1211 raise ValueError('Offset out of range')
1210 1212
1211 1213 if self._pos != newpos:
1212 1214 chunk, internaloffset = self._findchunk(newpos)
1213 1215 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1214 1216 adjust = self.read(internaloffset)
1215 1217 if len(adjust) != internaloffset:
1216 1218 raise error.Abort(_('Seek failed\n'))
1217 1219 self._pos = newpos
1218 1220
1219 1221 # These are only the static capabilities.
1220 1222 # Check the 'getrepocaps' function for the rest.
1221 1223 capabilities = {'HG20': (),
1222 1224 'error': ('abort', 'unsupportedcontent', 'pushraced',
1223 1225 'pushkey'),
1224 1226 'listkeys': (),
1225 1227 'pushkey': (),
1226 1228 'digests': tuple(sorted(util.DIGESTS.keys())),
1227 1229 'remote-changegroup': ('http', 'https'),
1228 1230 'hgtagsfnodes': (),
1229 1231 }
1230 1232
1231 1233 def getrepocaps(repo, allowpushback=False):
1232 1234 """return the bundle2 capabilities for a given repo
1233 1235
1234 1236 Exists to allow extensions (like evolution) to mutate the capabilities.
1235 1237 """
1236 1238 caps = capabilities.copy()
1237 1239 caps['changegroup'] = tuple(sorted(changegroup.packermap.keys()))
1238 1240 if obsolete.isenabled(repo, obsolete.exchangeopt):
1239 1241 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1240 1242 caps['obsmarkers'] = supportedformat
1241 1243 if allowpushback:
1242 1244 caps['pushback'] = ()
1243 1245 return caps
1244 1246
1245 1247 def bundle2caps(remote):
1246 1248 """return the bundle capabilities of a peer as dict"""
1247 1249 raw = remote.capable('bundle2')
1248 1250 if not raw and raw != '':
1249 1251 return {}
1250 1252 capsblob = urllib.unquote(remote.capable('bundle2'))
1251 1253 return decodecaps(capsblob)
1252 1254
1253 1255 def obsmarkersversion(caps):
1254 1256 """extract the list of supported obsmarkers versions from a bundle2caps dict
1255 1257 """
1256 1258 obscaps = caps.get('obsmarkers', ())
1257 1259 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1258 1260
1259 1261 @parthandler('changegroup', ('version', 'nbchanges'))
1260 1262 def handlechangegroup(op, inpart):
1261 1263 """apply a changegroup part on the repo
1262 1264
1263 1265 This is a very early implementation that will massive rework before being
1264 1266 inflicted to any end-user.
1265 1267 """
1266 1268 # Make sure we trigger a transaction creation
1267 1269 #
1268 1270 # The addchangegroup function will get a transaction object by itself, but
1269 1271 # we need to make sure we trigger the creation of a transaction object used
1270 1272 # for the whole processing scope.
1271 1273 op.gettransaction()
1272 1274 unpackerversion = inpart.params.get('version', '01')
1273 1275 # We should raise an appropriate exception here
1274 1276 unpacker = changegroup.packermap[unpackerversion][1]
1275 1277 cg = unpacker(inpart, None)
1276 1278 # the source and url passed here are overwritten by the one contained in
1277 1279 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1278 1280 nbchangesets = None
1279 1281 if 'nbchanges' in inpart.params:
1280 1282 nbchangesets = int(inpart.params.get('nbchanges'))
1281 1283 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1282 1284 op.records.add('changegroup', {'return': ret})
1283 1285 if op.reply is not None:
1284 1286 # This is definitely not the final form of this
1285 1287 # return. But one need to start somewhere.
1286 1288 part = op.reply.newpart('reply:changegroup', mandatory=False)
1287 1289 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1288 1290 part.addparam('return', '%i' % ret, mandatory=False)
1289 1291 assert not inpart.read()
1290 1292
1291 1293 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1292 1294 ['digest:%s' % k for k in util.DIGESTS.keys()])
1293 1295 @parthandler('remote-changegroup', _remotechangegroupparams)
1294 1296 def handleremotechangegroup(op, inpart):
1295 1297 """apply a bundle10 on the repo, given an url and validation information
1296 1298
1297 1299 All the information about the remote bundle to import are given as
1298 1300 parameters. The parameters include:
1299 1301 - url: the url to the bundle10.
1300 1302 - size: the bundle10 file size. It is used to validate what was
1301 1303 retrieved by the client matches the server knowledge about the bundle.
1302 1304 - digests: a space separated list of the digest types provided as
1303 1305 parameters.
1304 1306 - digest:<digest-type>: the hexadecimal representation of the digest with
1305 1307 that name. Like the size, it is used to validate what was retrieved by
1306 1308 the client matches what the server knows about the bundle.
1307 1309
1308 1310 When multiple digest types are given, all of them are checked.
1309 1311 """
1310 1312 try:
1311 1313 raw_url = inpart.params['url']
1312 1314 except KeyError:
1313 1315 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1314 1316 parsed_url = util.url(raw_url)
1315 1317 if parsed_url.scheme not in capabilities['remote-changegroup']:
1316 1318 raise error.Abort(_('remote-changegroup does not support %s urls') %
1317 1319 parsed_url.scheme)
1318 1320
1319 1321 try:
1320 1322 size = int(inpart.params['size'])
1321 1323 except ValueError:
1322 1324 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1323 1325 % 'size')
1324 1326 except KeyError:
1325 1327 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1326 1328
1327 1329 digests = {}
1328 1330 for typ in inpart.params.get('digests', '').split():
1329 1331 param = 'digest:%s' % typ
1330 1332 try:
1331 1333 value = inpart.params[param]
1332 1334 except KeyError:
1333 1335 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1334 1336 param)
1335 1337 digests[typ] = value
1336 1338
1337 1339 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1338 1340
1339 1341 # Make sure we trigger a transaction creation
1340 1342 #
1341 1343 # The addchangegroup function will get a transaction object by itself, but
1342 1344 # we need to make sure we trigger the creation of a transaction object used
1343 1345 # for the whole processing scope.
1344 1346 op.gettransaction()
1345 1347 from . import exchange
1346 1348 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1347 1349 if not isinstance(cg, changegroup.cg1unpacker):
1348 1350 raise error.Abort(_('%s: not a bundle version 1.0') %
1349 1351 util.hidepassword(raw_url))
1350 1352 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1351 1353 op.records.add('changegroup', {'return': ret})
1352 1354 if op.reply is not None:
1353 1355 # This is definitely not the final form of this
1354 1356 # return. But one need to start somewhere.
1355 1357 part = op.reply.newpart('reply:changegroup')
1356 1358 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1357 1359 part.addparam('return', '%i' % ret, mandatory=False)
1358 1360 try:
1359 1361 real_part.validate()
1360 1362 except error.Abort as e:
1361 1363 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1362 1364 (util.hidepassword(raw_url), str(e)))
1363 1365 assert not inpart.read()
1364 1366
1365 1367 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1366 1368 def handlereplychangegroup(op, inpart):
1367 1369 ret = int(inpart.params['return'])
1368 1370 replyto = int(inpart.params['in-reply-to'])
1369 1371 op.records.add('changegroup', {'return': ret}, replyto)
1370 1372
1371 1373 @parthandler('check:heads')
1372 1374 def handlecheckheads(op, inpart):
1373 1375 """check that head of the repo did not change
1374 1376
1375 1377 This is used to detect a push race when using unbundle.
1376 1378 This replaces the "heads" argument of unbundle."""
1377 1379 h = inpart.read(20)
1378 1380 heads = []
1379 1381 while len(h) == 20:
1380 1382 heads.append(h)
1381 1383 h = inpart.read(20)
1382 1384 assert not h
1383 1385 # Trigger a transaction so that we are guaranteed to have the lock now.
1384 1386 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1385 1387 op.gettransaction()
1386 1388 if heads != op.repo.heads():
1387 1389 raise error.PushRaced('repository changed while pushing - '
1388 1390 'please try again')
1389 1391
1390 1392 @parthandler('output')
1391 1393 def handleoutput(op, inpart):
1392 1394 """forward output captured on the server to the client"""
1393 1395 for line in inpart.read().splitlines():
1394 1396 op.ui.status(('remote: %s\n' % line))
1395 1397
1396 1398 @parthandler('replycaps')
1397 1399 def handlereplycaps(op, inpart):
1398 1400 """Notify that a reply bundle should be created
1399 1401
1400 1402 The payload contains the capabilities information for the reply"""
1401 1403 caps = decodecaps(inpart.read())
1402 1404 if op.reply is None:
1403 1405 op.reply = bundle20(op.ui, caps)
1404 1406
1405 1407 @parthandler('error:abort', ('message', 'hint'))
1406 1408 def handleerrorabort(op, inpart):
1407 1409 """Used to transmit abort error over the wire"""
1408 1410 raise error.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
1409 1411
1410 1412 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1411 1413 'in-reply-to'))
1412 1414 def handleerrorpushkey(op, inpart):
1413 1415 """Used to transmit failure of a mandatory pushkey over the wire"""
1414 1416 kwargs = {}
1415 1417 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1416 1418 value = inpart.params.get(name)
1417 1419 if value is not None:
1418 1420 kwargs[name] = value
1419 1421 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1420 1422
1421 1423 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1422 1424 def handleerrorunsupportedcontent(op, inpart):
1423 1425 """Used to transmit unknown content error over the wire"""
1424 1426 kwargs = {}
1425 1427 parttype = inpart.params.get('parttype')
1426 1428 if parttype is not None:
1427 1429 kwargs['parttype'] = parttype
1428 1430 params = inpart.params.get('params')
1429 1431 if params is not None:
1430 1432 kwargs['params'] = params.split('\0')
1431 1433
1432 1434 raise error.BundleUnknownFeatureError(**kwargs)
1433 1435
1434 1436 @parthandler('error:pushraced', ('message',))
1435 1437 def handleerrorpushraced(op, inpart):
1436 1438 """Used to transmit push race error over the wire"""
1437 1439 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1438 1440
1439 1441 @parthandler('listkeys', ('namespace',))
1440 1442 def handlelistkeys(op, inpart):
1441 1443 """retrieve pushkey namespace content stored in a bundle2"""
1442 1444 namespace = inpart.params['namespace']
1443 1445 r = pushkey.decodekeys(inpart.read())
1444 1446 op.records.add('listkeys', (namespace, r))
1445 1447
1446 1448 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1447 1449 def handlepushkey(op, inpart):
1448 1450 """process a pushkey request"""
1449 1451 dec = pushkey.decode
1450 1452 namespace = dec(inpart.params['namespace'])
1451 1453 key = dec(inpart.params['key'])
1452 1454 old = dec(inpart.params['old'])
1453 1455 new = dec(inpart.params['new'])
1454 1456 # Grab the transaction to ensure that we have the lock before performing the
1455 1457 # pushkey.
1456 1458 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1457 1459 op.gettransaction()
1458 1460 ret = op.repo.pushkey(namespace, key, old, new)
1459 1461 record = {'namespace': namespace,
1460 1462 'key': key,
1461 1463 'old': old,
1462 1464 'new': new}
1463 1465 op.records.add('pushkey', record)
1464 1466 if op.reply is not None:
1465 1467 rpart = op.reply.newpart('reply:pushkey')
1466 1468 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1467 1469 rpart.addparam('return', '%i' % ret, mandatory=False)
1468 1470 if inpart.mandatory and not ret:
1469 1471 kwargs = {}
1470 1472 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1471 1473 if key in inpart.params:
1472 1474 kwargs[key] = inpart.params[key]
1473 1475 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1474 1476
1475 1477 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1476 1478 def handlepushkeyreply(op, inpart):
1477 1479 """retrieve the result of a pushkey request"""
1478 1480 ret = int(inpart.params['return'])
1479 1481 partid = int(inpart.params['in-reply-to'])
1480 1482 op.records.add('pushkey', {'return': ret}, partid)
1481 1483
1482 1484 @parthandler('obsmarkers')
1483 1485 def handleobsmarker(op, inpart):
1484 1486 """add a stream of obsmarkers to the repo"""
1485 1487 tr = op.gettransaction()
1486 1488 markerdata = inpart.read()
1487 1489 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1488 1490 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1489 1491 % len(markerdata))
1490 1492 # The mergemarkers call will crash if marker creation is not enabled.
1491 1493 # we want to avoid this if the part is advisory.
1492 1494 if not inpart.mandatory and op.repo.obsstore.readonly:
1493 1495 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1494 1496 return
1495 1497 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1496 1498 if new:
1497 1499 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1498 1500 op.records.add('obsmarkers', {'new': new})
1499 1501 if op.reply is not None:
1500 1502 rpart = op.reply.newpart('reply:obsmarkers')
1501 1503 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1502 1504 rpart.addparam('new', '%i' % new, mandatory=False)
1503 1505
1504 1506
1505 1507 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1506 1508 def handleobsmarkerreply(op, inpart):
1507 1509 """retrieve the result of a pushkey request"""
1508 1510 ret = int(inpart.params['new'])
1509 1511 partid = int(inpart.params['in-reply-to'])
1510 1512 op.records.add('obsmarkers', {'new': ret}, partid)
1511 1513
1512 1514 @parthandler('hgtagsfnodes')
1513 1515 def handlehgtagsfnodes(op, inpart):
1514 1516 """Applies .hgtags fnodes cache entries to the local repo.
1515 1517
1516 1518 Payload is pairs of 20 byte changeset nodes and filenodes.
1517 1519 """
1518 1520 # Grab the transaction so we ensure that we have the lock at this point.
1519 1521 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1520 1522 op.gettransaction()
1521 1523 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1522 1524
1523 1525 count = 0
1524 1526 while True:
1525 1527 node = inpart.read(20)
1526 1528 fnode = inpart.read(20)
1527 1529 if len(node) < 20 or len(fnode) < 20:
1528 1530 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1529 1531 break
1530 1532 cache.setfnode(node, fnode)
1531 1533 count += 1
1532 1534
1533 1535 cache.write()
1534 1536 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now