##// END OF EJS Templates
bundle2: move 'seek' and 'tell' methods off the unpackermixin class...
Pierre-Yves David -
r31889:a02e7730 default
parent child Browse files
Show More
@@ -1,1645 +1,1656 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 pushkey,
162 162 pycompat,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 urlerr = util.urlerr
169 169 urlreq = util.urlreq
170 170
171 171 _pack = struct.pack
172 172 _unpack = struct.unpack
173 173
174 174 _fstreamparamsize = '>i'
175 175 _fpartheadersize = '>i'
176 176 _fparttypesize = '>B'
177 177 _fpartid = '>I'
178 178 _fpayloadsize = '>i'
179 179 _fpartparamcount = '>BB'
180 180
181 181 preferedchunksize = 4096
182 182
183 183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184 184
185 185 def outdebug(ui, message):
186 186 """debug regarding output stream (bundling)"""
187 187 if ui.configbool('devel', 'bundle2.debug', False):
188 188 ui.debug('bundle2-output: %s\n' % message)
189 189
190 190 def indebug(ui, message):
191 191 """debug on input stream (unbundling)"""
192 192 if ui.configbool('devel', 'bundle2.debug', False):
193 193 ui.debug('bundle2-input: %s\n' % message)
194 194
195 195 def validateparttype(parttype):
196 196 """raise ValueError if a parttype contains invalid character"""
197 197 if _parttypeforbidden.search(parttype):
198 198 raise ValueError(parttype)
199 199
200 200 def _makefpartparamsizes(nbparams):
201 201 """return a struct format to read part parameter sizes
202 202
203 203 The number parameters is variable so we need to build that format
204 204 dynamically.
205 205 """
206 206 return '>'+('BB'*nbparams)
207 207
208 208 parthandlermapping = {}
209 209
210 210 def parthandler(parttype, params=()):
211 211 """decorator that register a function as a bundle2 part handler
212 212
213 213 eg::
214 214
215 215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 216 def myparttypehandler(...):
217 217 '''process a part of type "my part".'''
218 218 ...
219 219 """
220 220 validateparttype(parttype)
221 221 def _decorator(func):
222 222 lparttype = parttype.lower() # enforce lower case matching.
223 223 assert lparttype not in parthandlermapping
224 224 parthandlermapping[lparttype] = func
225 225 func.params = frozenset(params)
226 226 return func
227 227 return _decorator
228 228
229 229 class unbundlerecords(object):
230 230 """keep record of what happens during and unbundle
231 231
232 232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 233 category of record and obj is an arbitrary object.
234 234
235 235 `records['cat']` will return all entries of this category 'cat'.
236 236
237 237 Iterating on the object itself will yield `('category', obj)` tuples
238 238 for all entries.
239 239
240 240 All iterations happens in chronological order.
241 241 """
242 242
243 243 def __init__(self):
244 244 self._categories = {}
245 245 self._sequences = []
246 246 self._replies = {}
247 247
248 248 def add(self, category, entry, inreplyto=None):
249 249 """add a new record of a given category.
250 250
251 251 The entry can then be retrieved in the list returned by
252 252 self['category']."""
253 253 self._categories.setdefault(category, []).append(entry)
254 254 self._sequences.append((category, entry))
255 255 if inreplyto is not None:
256 256 self.getreplies(inreplyto).add(category, entry)
257 257
258 258 def getreplies(self, partid):
259 259 """get the records that are replies to a specific part"""
260 260 return self._replies.setdefault(partid, unbundlerecords())
261 261
262 262 def __getitem__(self, cat):
263 263 return tuple(self._categories.get(cat, ()))
264 264
265 265 def __iter__(self):
266 266 return iter(self._sequences)
267 267
268 268 def __len__(self):
269 269 return len(self._sequences)
270 270
271 271 def __nonzero__(self):
272 272 return bool(self._sequences)
273 273
274 274 __bool__ = __nonzero__
275 275
276 276 class bundleoperation(object):
277 277 """an object that represents a single bundling process
278 278
279 279 Its purpose is to carry unbundle-related objects and states.
280 280
281 281 A new object should be created at the beginning of each bundle processing.
282 282 The object is to be returned by the processing function.
283 283
284 284 The object has very little content now it will ultimately contain:
285 285 * an access to the repo the bundle is applied to,
286 286 * a ui object,
287 287 * a way to retrieve a transaction to add changes to the repo,
288 288 * a way to record the result of processing each part,
289 289 * a way to construct a bundle response when applicable.
290 290 """
291 291
292 292 def __init__(self, repo, transactiongetter, captureoutput=True):
293 293 self.repo = repo
294 294 self.ui = repo.ui
295 295 self.records = unbundlerecords()
296 296 self.gettransaction = transactiongetter
297 297 self.reply = None
298 298 self.captureoutput = captureoutput
299 299
300 300 class TransactionUnavailable(RuntimeError):
301 301 pass
302 302
303 303 def _notransaction():
304 304 """default method to get a transaction while processing a bundle
305 305
306 306 Raise an exception to highlight the fact that no transaction was expected
307 307 to be created"""
308 308 raise TransactionUnavailable()
309 309
310 310 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
311 311 # transform me into unbundler.apply() as soon as the freeze is lifted
312 312 tr.hookargs['bundle2'] = '1'
313 313 if source is not None and 'source' not in tr.hookargs:
314 314 tr.hookargs['source'] = source
315 315 if url is not None and 'url' not in tr.hookargs:
316 316 tr.hookargs['url'] = url
317 317 return processbundle(repo, unbundler, lambda: tr, op=op)
318 318
319 319 def processbundle(repo, unbundler, transactiongetter=None, op=None):
320 320 """This function process a bundle, apply effect to/from a repo
321 321
322 322 It iterates over each part then searches for and uses the proper handling
323 323 code to process the part. Parts are processed in order.
324 324
325 325 Unknown Mandatory part will abort the process.
326 326
327 327 It is temporarily possible to provide a prebuilt bundleoperation to the
328 328 function. This is used to ensure output is properly propagated in case of
329 329 an error during the unbundling. This output capturing part will likely be
330 330 reworked and this ability will probably go away in the process.
331 331 """
332 332 if op is None:
333 333 if transactiongetter is None:
334 334 transactiongetter = _notransaction
335 335 op = bundleoperation(repo, transactiongetter)
336 336 # todo:
337 337 # - replace this is a init function soon.
338 338 # - exception catching
339 339 unbundler.params
340 340 if repo.ui.debugflag:
341 341 msg = ['bundle2-input-bundle:']
342 342 if unbundler.params:
343 343 msg.append(' %i params')
344 344 if op.gettransaction is None:
345 345 msg.append(' no-transaction')
346 346 else:
347 347 msg.append(' with-transaction')
348 348 msg.append('\n')
349 349 repo.ui.debug(''.join(msg))
350 350 iterparts = enumerate(unbundler.iterparts())
351 351 part = None
352 352 nbpart = 0
353 353 try:
354 354 for nbpart, part in iterparts:
355 355 _processpart(op, part)
356 356 except Exception as exc:
357 357 for nbpart, part in iterparts:
358 358 # consume the bundle content
359 359 part.seek(0, 2)
360 360 # Small hack to let caller code distinguish exceptions from bundle2
361 361 # processing from processing the old format. This is mostly
362 362 # needed to handle different return codes to unbundle according to the
363 363 # type of bundle. We should probably clean up or drop this return code
364 364 # craziness in a future version.
365 365 exc.duringunbundle2 = True
366 366 salvaged = []
367 367 replycaps = None
368 368 if op.reply is not None:
369 369 salvaged = op.reply.salvageoutput()
370 370 replycaps = op.reply.capabilities
371 371 exc._replycaps = replycaps
372 372 exc._bundle2salvagedoutput = salvaged
373 373 raise
374 374 finally:
375 375 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
376 376
377 377 return op
378 378
379 379 def _processpart(op, part):
380 380 """process a single part from a bundle
381 381
382 382 The part is guaranteed to have been fully consumed when the function exits
383 383 (even if an exception is raised)."""
384 384 status = 'unknown' # used by debug output
385 385 hardabort = False
386 386 try:
387 387 try:
388 388 handler = parthandlermapping.get(part.type)
389 389 if handler is None:
390 390 status = 'unsupported-type'
391 391 raise error.BundleUnknownFeatureError(parttype=part.type)
392 392 indebug(op.ui, 'found a handler for part %r' % part.type)
393 393 unknownparams = part.mandatorykeys - handler.params
394 394 if unknownparams:
395 395 unknownparams = list(unknownparams)
396 396 unknownparams.sort()
397 397 status = 'unsupported-params (%s)' % unknownparams
398 398 raise error.BundleUnknownFeatureError(parttype=part.type,
399 399 params=unknownparams)
400 400 status = 'supported'
401 401 except error.BundleUnknownFeatureError as exc:
402 402 if part.mandatory: # mandatory parts
403 403 raise
404 404 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
405 405 return # skip to part processing
406 406 finally:
407 407 if op.ui.debugflag:
408 408 msg = ['bundle2-input-part: "%s"' % part.type]
409 409 if not part.mandatory:
410 410 msg.append(' (advisory)')
411 411 nbmp = len(part.mandatorykeys)
412 412 nbap = len(part.params) - nbmp
413 413 if nbmp or nbap:
414 414 msg.append(' (params:')
415 415 if nbmp:
416 416 msg.append(' %i mandatory' % nbmp)
417 417 if nbap:
418 418 msg.append(' %i advisory' % nbmp)
419 419 msg.append(')')
420 420 msg.append(' %s\n' % status)
421 421 op.ui.debug(''.join(msg))
422 422
423 423 # handler is called outside the above try block so that we don't
424 424 # risk catching KeyErrors from anything other than the
425 425 # parthandlermapping lookup (any KeyError raised by handler()
426 426 # itself represents a defect of a different variety).
427 427 output = None
428 428 if op.captureoutput and op.reply is not None:
429 429 op.ui.pushbuffer(error=True, subproc=True)
430 430 output = ''
431 431 try:
432 432 handler(op, part)
433 433 finally:
434 434 if output is not None:
435 435 output = op.ui.popbuffer()
436 436 if output:
437 437 outpart = op.reply.newpart('output', data=output,
438 438 mandatory=False)
439 439 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
440 440 # If exiting or interrupted, do not attempt to seek the stream in the
441 441 # finally block below. This makes abort faster.
442 442 except (SystemExit, KeyboardInterrupt):
443 443 hardabort = True
444 444 raise
445 445 finally:
446 446 # consume the part content to not corrupt the stream.
447 447 if not hardabort:
448 448 part.seek(0, 2)
449 449
450 450
451 451 def decodecaps(blob):
452 452 """decode a bundle2 caps bytes blob into a dictionary
453 453
454 454 The blob is a list of capabilities (one per line)
455 455 Capabilities may have values using a line of the form::
456 456
457 457 capability=value1,value2,value3
458 458
459 459 The values are always a list."""
460 460 caps = {}
461 461 for line in blob.splitlines():
462 462 if not line:
463 463 continue
464 464 if '=' not in line:
465 465 key, vals = line, ()
466 466 else:
467 467 key, vals = line.split('=', 1)
468 468 vals = vals.split(',')
469 469 key = urlreq.unquote(key)
470 470 vals = [urlreq.unquote(v) for v in vals]
471 471 caps[key] = vals
472 472 return caps
473 473
474 474 def encodecaps(caps):
475 475 """encode a bundle2 caps dictionary into a bytes blob"""
476 476 chunks = []
477 477 for ca in sorted(caps):
478 478 vals = caps[ca]
479 479 ca = urlreq.quote(ca)
480 480 vals = [urlreq.quote(v) for v in vals]
481 481 if vals:
482 482 ca = "%s=%s" % (ca, ','.join(vals))
483 483 chunks.append(ca)
484 484 return '\n'.join(chunks)
485 485
486 486 bundletypes = {
487 487 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
488 488 # since the unification ssh accepts a header but there
489 489 # is no capability signaling it.
490 490 "HG20": (), # special-cased below
491 491 "HG10UN": ("HG10UN", 'UN'),
492 492 "HG10BZ": ("HG10", 'BZ'),
493 493 "HG10GZ": ("HG10GZ", 'GZ'),
494 494 }
495 495
496 496 # hgweb uses this list to communicate its preferred type
497 497 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
498 498
499 499 class bundle20(object):
500 500 """represent an outgoing bundle2 container
501 501
502 502 Use the `addparam` method to add stream level parameter. and `newpart` to
503 503 populate it. Then call `getchunks` to retrieve all the binary chunks of
504 504 data that compose the bundle2 container."""
505 505
506 506 _magicstring = 'HG20'
507 507
508 508 def __init__(self, ui, capabilities=()):
509 509 self.ui = ui
510 510 self._params = []
511 511 self._parts = []
512 512 self.capabilities = dict(capabilities)
513 513 self._compengine = util.compengines.forbundletype('UN')
514 514 self._compopts = None
515 515
516 516 def setcompression(self, alg, compopts=None):
517 517 """setup core part compression to <alg>"""
518 518 if alg in (None, 'UN'):
519 519 return
520 520 assert not any(n.lower() == 'compression' for n, v in self._params)
521 521 self.addparam('Compression', alg)
522 522 self._compengine = util.compengines.forbundletype(alg)
523 523 self._compopts = compopts
524 524
525 525 @property
526 526 def nbparts(self):
527 527 """total number of parts added to the bundler"""
528 528 return len(self._parts)
529 529
530 530 # methods used to defines the bundle2 content
531 531 def addparam(self, name, value=None):
532 532 """add a stream level parameter"""
533 533 if not name:
534 534 raise ValueError('empty parameter name')
535 535 if name[0] not in string.letters:
536 536 raise ValueError('non letter first character: %r' % name)
537 537 self._params.append((name, value))
538 538
539 539 def addpart(self, part):
540 540 """add a new part to the bundle2 container
541 541
542 542 Parts contains the actual applicative payload."""
543 543 assert part.id is None
544 544 part.id = len(self._parts) # very cheap counter
545 545 self._parts.append(part)
546 546
547 547 def newpart(self, typeid, *args, **kwargs):
548 548 """create a new part and add it to the containers
549 549
550 550 As the part is directly added to the containers. For now, this means
551 551 that any failure to properly initialize the part after calling
552 552 ``newpart`` should result in a failure of the whole bundling process.
553 553
554 554 You can still fall back to manually create and add if you need better
555 555 control."""
556 556 part = bundlepart(typeid, *args, **kwargs)
557 557 self.addpart(part)
558 558 return part
559 559
560 560 # methods used to generate the bundle2 stream
561 561 def getchunks(self):
562 562 if self.ui.debugflag:
563 563 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
564 564 if self._params:
565 565 msg.append(' (%i params)' % len(self._params))
566 566 msg.append(' %i parts total\n' % len(self._parts))
567 567 self.ui.debug(''.join(msg))
568 568 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
569 569 yield self._magicstring
570 570 param = self._paramchunk()
571 571 outdebug(self.ui, 'bundle parameter: %s' % param)
572 572 yield _pack(_fstreamparamsize, len(param))
573 573 if param:
574 574 yield param
575 575 for chunk in self._compengine.compressstream(self._getcorechunk(),
576 576 self._compopts):
577 577 yield chunk
578 578
579 579 def _paramchunk(self):
580 580 """return a encoded version of all stream parameters"""
581 581 blocks = []
582 582 for par, value in self._params:
583 583 par = urlreq.quote(par)
584 584 if value is not None:
585 585 value = urlreq.quote(value)
586 586 par = '%s=%s' % (par, value)
587 587 blocks.append(par)
588 588 return ' '.join(blocks)
589 589
590 590 def _getcorechunk(self):
591 591 """yield chunk for the core part of the bundle
592 592
593 593 (all but headers and parameters)"""
594 594 outdebug(self.ui, 'start of parts')
595 595 for part in self._parts:
596 596 outdebug(self.ui, 'bundle part: "%s"' % part.type)
597 597 for chunk in part.getchunks(ui=self.ui):
598 598 yield chunk
599 599 outdebug(self.ui, 'end of bundle')
600 600 yield _pack(_fpartheadersize, 0)
601 601
602 602
603 603 def salvageoutput(self):
604 604 """return a list with a copy of all output parts in the bundle
605 605
606 606 This is meant to be used during error handling to make sure we preserve
607 607 server output"""
608 608 salvaged = []
609 609 for part in self._parts:
610 610 if part.type.startswith('output'):
611 611 salvaged.append(part.copy())
612 612 return salvaged
613 613
614 614
615 615 class unpackermixin(object):
616 616 """A mixin to extract bytes and struct data from a stream"""
617 617
618 618 def __init__(self, fp):
619 619 self._fp = fp
620 self._seekable = (util.safehasattr(fp, 'seek') and
621 util.safehasattr(fp, 'tell'))
622 620
623 621 def _unpack(self, format):
624 622 """unpack this struct format from the stream
625 623
626 624 This method is meant for internal usage by the bundle2 protocol only.
627 625 They directly manipulate the low level stream including bundle2 level
628 626 instruction.
629 627
630 628 Do not use it to implement higher-level logic or methods."""
631 629 data = self._readexact(struct.calcsize(format))
632 630 return _unpack(format, data)
633 631
634 632 def _readexact(self, size):
635 633 """read exactly <size> bytes from the stream
636 634
637 635 This method is meant for internal usage by the bundle2 protocol only.
638 636 They directly manipulate the low level stream including bundle2 level
639 637 instruction.
640 638
641 639 Do not use it to implement higher-level logic or methods."""
642 640 return changegroup.readexactly(self._fp, size)
643 641
644 def seek(self, offset, whence=0):
645 """move the underlying file pointer"""
646 if self._seekable:
647 return self._fp.seek(offset, whence)
648 else:
649 raise NotImplementedError(_('File pointer is not seekable'))
650
651 def tell(self):
652 """return the file offset, or None if file is not seekable"""
653 if self._seekable:
654 try:
655 return self._fp.tell()
656 except IOError as e:
657 if e.errno == errno.ESPIPE:
658 self._seekable = False
659 else:
660 raise
661 return None
662
663 642 def getunbundler(ui, fp, magicstring=None):
664 643 """return a valid unbundler object for a given magicstring"""
665 644 if magicstring is None:
666 645 magicstring = changegroup.readexactly(fp, 4)
667 646 magic, version = magicstring[0:2], magicstring[2:4]
668 647 if magic != 'HG':
669 648 raise error.Abort(_('not a Mercurial bundle'))
670 649 unbundlerclass = formatmap.get(version)
671 650 if unbundlerclass is None:
672 651 raise error.Abort(_('unknown bundle version %s') % version)
673 652 unbundler = unbundlerclass(ui, fp)
674 653 indebug(ui, 'start processing of %s stream' % magicstring)
675 654 return unbundler
676 655
677 656 class unbundle20(unpackermixin):
678 657 """interpret a bundle2 stream
679 658
680 659 This class is fed with a binary stream and yields parts through its
681 660 `iterparts` methods."""
682 661
683 662 _magicstring = 'HG20'
684 663
685 664 def __init__(self, ui, fp):
686 665 """If header is specified, we do not read it out of the stream."""
687 666 self.ui = ui
688 667 self._compengine = util.compengines.forbundletype('UN')
689 668 self._compressed = None
690 669 super(unbundle20, self).__init__(fp)
691 670
692 671 @util.propertycache
693 672 def params(self):
694 673 """dictionary of stream level parameters"""
695 674 indebug(self.ui, 'reading bundle2 stream parameters')
696 675 params = {}
697 676 paramssize = self._unpack(_fstreamparamsize)[0]
698 677 if paramssize < 0:
699 678 raise error.BundleValueError('negative bundle param size: %i'
700 679 % paramssize)
701 680 if paramssize:
702 681 params = self._readexact(paramssize)
703 682 params = self._processallparams(params)
704 683 return params
705 684
706 685 def _processallparams(self, paramsblock):
707 686 """"""
708 687 params = util.sortdict()
709 688 for p in paramsblock.split(' '):
710 689 p = p.split('=', 1)
711 690 p = [urlreq.unquote(i) for i in p]
712 691 if len(p) < 2:
713 692 p.append(None)
714 693 self._processparam(*p)
715 694 params[p[0]] = p[1]
716 695 return params
717 696
718 697
719 698 def _processparam(self, name, value):
720 699 """process a parameter, applying its effect if needed
721 700
722 701 Parameter starting with a lower case letter are advisory and will be
723 702 ignored when unknown. Those starting with an upper case letter are
724 703 mandatory and will this function will raise a KeyError when unknown.
725 704
726 705 Note: no option are currently supported. Any input will be either
727 706 ignored or failing.
728 707 """
729 708 if not name:
730 709 raise ValueError('empty parameter name')
731 710 if name[0] not in string.letters:
732 711 raise ValueError('non letter first character: %r' % name)
733 712 try:
734 713 handler = b2streamparamsmap[name.lower()]
735 714 except KeyError:
736 715 if name[0].islower():
737 716 indebug(self.ui, "ignoring unknown parameter %r" % name)
738 717 else:
739 718 raise error.BundleUnknownFeatureError(params=(name,))
740 719 else:
741 720 handler(self, name, value)
742 721
743 722 def _forwardchunks(self):
744 723 """utility to transfer a bundle2 as binary
745 724
746 725 This is made necessary by the fact the 'getbundle' command over 'ssh'
747 726 have no way to know then the reply end, relying on the bundle to be
748 727 interpreted to know its end. This is terrible and we are sorry, but we
749 728 needed to move forward to get general delta enabled.
750 729 """
751 730 yield self._magicstring
752 731 assert 'params' not in vars(self)
753 732 paramssize = self._unpack(_fstreamparamsize)[0]
754 733 if paramssize < 0:
755 734 raise error.BundleValueError('negative bundle param size: %i'
756 735 % paramssize)
757 736 yield _pack(_fstreamparamsize, paramssize)
758 737 if paramssize:
759 738 params = self._readexact(paramssize)
760 739 self._processallparams(params)
761 740 yield params
762 741 assert self._compengine.bundletype == 'UN'
763 742 # From there, payload might need to be decompressed
764 743 self._fp = self._compengine.decompressorreader(self._fp)
765 744 emptycount = 0
766 745 while emptycount < 2:
767 746 # so we can brainlessly loop
768 747 assert _fpartheadersize == _fpayloadsize
769 748 size = self._unpack(_fpartheadersize)[0]
770 749 yield _pack(_fpartheadersize, size)
771 750 if size:
772 751 emptycount = 0
773 752 else:
774 753 emptycount += 1
775 754 continue
776 755 if size == flaginterrupt:
777 756 continue
778 757 elif size < 0:
779 758 raise error.BundleValueError('negative chunk size: %i')
780 759 yield self._readexact(size)
781 760
782 761
783 762 def iterparts(self):
784 763 """yield all parts contained in the stream"""
785 764 # make sure param have been loaded
786 765 self.params
787 766 # From there, payload need to be decompressed
788 767 self._fp = self._compengine.decompressorreader(self._fp)
789 768 indebug(self.ui, 'start extraction of bundle2 parts')
790 769 headerblock = self._readpartheader()
791 770 while headerblock is not None:
792 771 part = unbundlepart(self.ui, headerblock, self._fp)
793 772 yield part
794 773 part.seek(0, 2)
795 774 headerblock = self._readpartheader()
796 775 indebug(self.ui, 'end of bundle2 stream')
797 776
798 777 def _readpartheader(self):
799 778 """reads a part header size and return the bytes blob
800 779
801 780 returns None if empty"""
802 781 headersize = self._unpack(_fpartheadersize)[0]
803 782 if headersize < 0:
804 783 raise error.BundleValueError('negative part header size: %i'
805 784 % headersize)
806 785 indebug(self.ui, 'part header size: %i' % headersize)
807 786 if headersize:
808 787 return self._readexact(headersize)
809 788 return None
810 789
811 790 def compressed(self):
812 791 self.params # load params
813 792 return self._compressed
814 793
815 794 def close(self):
816 795 """close underlying file"""
817 796 if util.safehasattr(self._fp, 'close'):
818 797 return self._fp.close()
819 798
820 799 formatmap = {'20': unbundle20}
821 800
822 801 b2streamparamsmap = {}
823 802
824 803 def b2streamparamhandler(name):
825 804 """register a handler for a stream level parameter"""
826 805 def decorator(func):
827 806 assert name not in formatmap
828 807 b2streamparamsmap[name] = func
829 808 return func
830 809 return decorator
831 810
832 811 @b2streamparamhandler('compression')
833 812 def processcompression(unbundler, param, value):
834 813 """read compression parameter and install payload decompression"""
835 814 if value not in util.compengines.supportedbundletypes:
836 815 raise error.BundleUnknownFeatureError(params=(param,),
837 816 values=(value,))
838 817 unbundler._compengine = util.compengines.forbundletype(value)
839 818 if value is not None:
840 819 unbundler._compressed = True
841 820
842 821 class bundlepart(object):
843 822 """A bundle2 part contains application level payload
844 823
845 824 The part `type` is used to route the part to the application level
846 825 handler.
847 826
848 827 The part payload is contained in ``part.data``. It could be raw bytes or a
849 828 generator of byte chunks.
850 829
851 830 You can add parameters to the part using the ``addparam`` method.
852 831 Parameters can be either mandatory (default) or advisory. Remote side
853 832 should be able to safely ignore the advisory ones.
854 833
855 834 Both data and parameters cannot be modified after the generation has begun.
856 835 """
857 836
858 837 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
859 838 data='', mandatory=True):
860 839 validateparttype(parttype)
861 840 self.id = None
862 841 self.type = parttype
863 842 self._data = data
864 843 self._mandatoryparams = list(mandatoryparams)
865 844 self._advisoryparams = list(advisoryparams)
866 845 # checking for duplicated entries
867 846 self._seenparams = set()
868 847 for pname, __ in self._mandatoryparams + self._advisoryparams:
869 848 if pname in self._seenparams:
870 849 raise error.ProgrammingError('duplicated params: %s' % pname)
871 850 self._seenparams.add(pname)
872 851 # status of the part's generation:
873 852 # - None: not started,
874 853 # - False: currently generated,
875 854 # - True: generation done.
876 855 self._generated = None
877 856 self.mandatory = mandatory
878 857
879 858 def __repr__(self):
880 859 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
881 860 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
882 861 % (cls, id(self), self.id, self.type, self.mandatory))
883 862
884 863 def copy(self):
885 864 """return a copy of the part
886 865
887 866 The new part have the very same content but no partid assigned yet.
888 867 Parts with generated data cannot be copied."""
889 868 assert not util.safehasattr(self.data, 'next')
890 869 return self.__class__(self.type, self._mandatoryparams,
891 870 self._advisoryparams, self._data, self.mandatory)
892 871
893 872 # methods used to defines the part content
894 873 @property
895 874 def data(self):
896 875 return self._data
897 876
898 877 @data.setter
899 878 def data(self, data):
900 879 if self._generated is not None:
901 880 raise error.ReadOnlyPartError('part is being generated')
902 881 self._data = data
903 882
904 883 @property
905 884 def mandatoryparams(self):
906 885 # make it an immutable tuple to force people through ``addparam``
907 886 return tuple(self._mandatoryparams)
908 887
909 888 @property
910 889 def advisoryparams(self):
911 890 # make it an immutable tuple to force people through ``addparam``
912 891 return tuple(self._advisoryparams)
913 892
914 893 def addparam(self, name, value='', mandatory=True):
915 894 """add a parameter to the part
916 895
917 896 If 'mandatory' is set to True, the remote handler must claim support
918 897 for this parameter or the unbundling will be aborted.
919 898
920 899 The 'name' and 'value' cannot exceed 255 bytes each.
921 900 """
922 901 if self._generated is not None:
923 902 raise error.ReadOnlyPartError('part is being generated')
924 903 if name in self._seenparams:
925 904 raise ValueError('duplicated params: %s' % name)
926 905 self._seenparams.add(name)
927 906 params = self._advisoryparams
928 907 if mandatory:
929 908 params = self._mandatoryparams
930 909 params.append((name, value))
931 910
932 911 # methods used to generates the bundle2 stream
933 912 def getchunks(self, ui):
934 913 if self._generated is not None:
935 914 raise error.ProgrammingError('part can only be consumed once')
936 915 self._generated = False
937 916
938 917 if ui.debugflag:
939 918 msg = ['bundle2-output-part: "%s"' % self.type]
940 919 if not self.mandatory:
941 920 msg.append(' (advisory)')
942 921 nbmp = len(self.mandatoryparams)
943 922 nbap = len(self.advisoryparams)
944 923 if nbmp or nbap:
945 924 msg.append(' (params:')
946 925 if nbmp:
947 926 msg.append(' %i mandatory' % nbmp)
948 927 if nbap:
949 928 msg.append(' %i advisory' % nbmp)
950 929 msg.append(')')
951 930 if not self.data:
952 931 msg.append(' empty payload')
953 932 elif util.safehasattr(self.data, 'next'):
954 933 msg.append(' streamed payload')
955 934 else:
956 935 msg.append(' %i bytes payload' % len(self.data))
957 936 msg.append('\n')
958 937 ui.debug(''.join(msg))
959 938
960 939 #### header
961 940 if self.mandatory:
962 941 parttype = self.type.upper()
963 942 else:
964 943 parttype = self.type.lower()
965 944 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
966 945 ## parttype
967 946 header = [_pack(_fparttypesize, len(parttype)),
968 947 parttype, _pack(_fpartid, self.id),
969 948 ]
970 949 ## parameters
971 950 # count
972 951 manpar = self.mandatoryparams
973 952 advpar = self.advisoryparams
974 953 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
975 954 # size
976 955 parsizes = []
977 956 for key, value in manpar:
978 957 parsizes.append(len(key))
979 958 parsizes.append(len(value))
980 959 for key, value in advpar:
981 960 parsizes.append(len(key))
982 961 parsizes.append(len(value))
983 962 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
984 963 header.append(paramsizes)
985 964 # key, value
986 965 for key, value in manpar:
987 966 header.append(key)
988 967 header.append(value)
989 968 for key, value in advpar:
990 969 header.append(key)
991 970 header.append(value)
992 971 ## finalize header
993 972 headerchunk = ''.join(header)
994 973 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
995 974 yield _pack(_fpartheadersize, len(headerchunk))
996 975 yield headerchunk
997 976 ## payload
998 977 try:
999 978 for chunk in self._payloadchunks():
1000 979 outdebug(ui, 'payload chunk size: %i' % len(chunk))
1001 980 yield _pack(_fpayloadsize, len(chunk))
1002 981 yield chunk
1003 982 except GeneratorExit:
1004 983 # GeneratorExit means that nobody is listening for our
1005 984 # results anyway, so just bail quickly rather than trying
1006 985 # to produce an error part.
1007 986 ui.debug('bundle2-generatorexit\n')
1008 987 raise
1009 988 except BaseException as exc:
1010 989 # backup exception data for later
1011 990 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1012 991 % exc)
1013 992 exc_info = sys.exc_info()
1014 993 msg = 'unexpected error: %s' % exc
1015 994 interpart = bundlepart('error:abort', [('message', msg)],
1016 995 mandatory=False)
1017 996 interpart.id = 0
1018 997 yield _pack(_fpayloadsize, -1)
1019 998 for chunk in interpart.getchunks(ui=ui):
1020 999 yield chunk
1021 1000 outdebug(ui, 'closing payload chunk')
1022 1001 # abort current part payload
1023 1002 yield _pack(_fpayloadsize, 0)
1024 1003 if pycompat.ispy3:
1025 1004 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1026 1005 else:
1027 1006 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1028 1007 # end of payload
1029 1008 outdebug(ui, 'closing payload chunk')
1030 1009 yield _pack(_fpayloadsize, 0)
1031 1010 self._generated = True
1032 1011
1033 1012 def _payloadchunks(self):
1034 1013 """yield chunks of a the part payload
1035 1014
1036 1015 Exists to handle the different methods to provide data to a part."""
1037 1016 # we only support fixed size data now.
1038 1017 # This will be improved in the future.
1039 1018 if util.safehasattr(self.data, 'next'):
1040 1019 buff = util.chunkbuffer(self.data)
1041 1020 chunk = buff.read(preferedchunksize)
1042 1021 while chunk:
1043 1022 yield chunk
1044 1023 chunk = buff.read(preferedchunksize)
1045 1024 elif len(self.data):
1046 1025 yield self.data
1047 1026
1048 1027
1049 1028 flaginterrupt = -1
1050 1029
1051 1030 class interrupthandler(unpackermixin):
1052 1031 """read one part and process it with restricted capability
1053 1032
1054 1033 This allows to transmit exception raised on the producer size during part
1055 1034 iteration while the consumer is reading a part.
1056 1035
1057 1036 Part processed in this manner only have access to a ui object,"""
1058 1037
1059 1038 def __init__(self, ui, fp):
1060 1039 super(interrupthandler, self).__init__(fp)
1061 1040 self.ui = ui
1062 1041
1063 1042 def _readpartheader(self):
1064 1043 """reads a part header size and return the bytes blob
1065 1044
1066 1045 returns None if empty"""
1067 1046 headersize = self._unpack(_fpartheadersize)[0]
1068 1047 if headersize < 0:
1069 1048 raise error.BundleValueError('negative part header size: %i'
1070 1049 % headersize)
1071 1050 indebug(self.ui, 'part header size: %i\n' % headersize)
1072 1051 if headersize:
1073 1052 return self._readexact(headersize)
1074 1053 return None
1075 1054
1076 1055 def __call__(self):
1077 1056
1078 1057 self.ui.debug('bundle2-input-stream-interrupt:'
1079 1058 ' opening out of band context\n')
1080 1059 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1081 1060 headerblock = self._readpartheader()
1082 1061 if headerblock is None:
1083 1062 indebug(self.ui, 'no part found during interruption.')
1084 1063 return
1085 1064 part = unbundlepart(self.ui, headerblock, self._fp)
1086 1065 op = interruptoperation(self.ui)
1087 1066 _processpart(op, part)
1088 1067 self.ui.debug('bundle2-input-stream-interrupt:'
1089 1068 ' closing out of band context\n')
1090 1069
1091 1070 class interruptoperation(object):
1092 1071 """A limited operation to be use by part handler during interruption
1093 1072
1094 1073 It only have access to an ui object.
1095 1074 """
1096 1075
1097 1076 def __init__(self, ui):
1098 1077 self.ui = ui
1099 1078 self.reply = None
1100 1079 self.captureoutput = False
1101 1080
1102 1081 @property
1103 1082 def repo(self):
1104 1083 raise error.ProgrammingError('no repo access from stream interruption')
1105 1084
1106 1085 def gettransaction(self):
1107 1086 raise TransactionUnavailable('no repo access from stream interruption')
1108 1087
1109 1088 class unbundlepart(unpackermixin):
1110 1089 """a bundle part read from a bundle"""
1111 1090
1112 1091 def __init__(self, ui, header, fp):
1113 1092 super(unbundlepart, self).__init__(fp)
1093 self._seekable = (util.safehasattr(fp, 'seek') and
1094 util.safehasattr(fp, 'tell'))
1114 1095 self.ui = ui
1115 1096 # unbundle state attr
1116 1097 self._headerdata = header
1117 1098 self._headeroffset = 0
1118 1099 self._initialized = False
1119 1100 self.consumed = False
1120 1101 # part data
1121 1102 self.id = None
1122 1103 self.type = None
1123 1104 self.mandatoryparams = None
1124 1105 self.advisoryparams = None
1125 1106 self.params = None
1126 1107 self.mandatorykeys = ()
1127 1108 self._payloadstream = None
1128 1109 self._readheader()
1129 1110 self._mandatory = None
1130 1111 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1131 1112 self._pos = 0
1132 1113
1133 1114 def _fromheader(self, size):
1134 1115 """return the next <size> byte from the header"""
1135 1116 offset = self._headeroffset
1136 1117 data = self._headerdata[offset:(offset + size)]
1137 1118 self._headeroffset = offset + size
1138 1119 return data
1139 1120
1140 1121 def _unpackheader(self, format):
1141 1122 """read given format from header
1142 1123
1143 1124 This automatically compute the size of the format to read."""
1144 1125 data = self._fromheader(struct.calcsize(format))
1145 1126 return _unpack(format, data)
1146 1127
1147 1128 def _initparams(self, mandatoryparams, advisoryparams):
1148 1129 """internal function to setup all logic related parameters"""
1149 1130 # make it read only to prevent people touching it by mistake.
1150 1131 self.mandatoryparams = tuple(mandatoryparams)
1151 1132 self.advisoryparams = tuple(advisoryparams)
1152 1133 # user friendly UI
1153 1134 self.params = util.sortdict(self.mandatoryparams)
1154 1135 self.params.update(self.advisoryparams)
1155 1136 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1156 1137
1157 1138 def _payloadchunks(self, chunknum=0):
1158 1139 '''seek to specified chunk and start yielding data'''
1159 1140 if len(self._chunkindex) == 0:
1160 1141 assert chunknum == 0, 'Must start with chunk 0'
1161 self._chunkindex.append((0, super(unbundlepart, self).tell()))
1142 self._chunkindex.append((0, self._tellfp()))
1162 1143 else:
1163 1144 assert chunknum < len(self._chunkindex), \
1164 1145 'Unknown chunk %d' % chunknum
1165 super(unbundlepart, self).seek(self._chunkindex[chunknum][1])
1146 self._seekfp(self._chunkindex[chunknum][1])
1166 1147
1167 1148 pos = self._chunkindex[chunknum][0]
1168 1149 payloadsize = self._unpack(_fpayloadsize)[0]
1169 1150 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1170 1151 while payloadsize:
1171 1152 if payloadsize == flaginterrupt:
1172 1153 # interruption detection, the handler will now read a
1173 1154 # single part and process it.
1174 1155 interrupthandler(self.ui, self._fp)()
1175 1156 elif payloadsize < 0:
1176 1157 msg = 'negative payload chunk size: %i' % payloadsize
1177 1158 raise error.BundleValueError(msg)
1178 1159 else:
1179 1160 result = self._readexact(payloadsize)
1180 1161 chunknum += 1
1181 1162 pos += payloadsize
1182 1163 if chunknum == len(self._chunkindex):
1183 self._chunkindex.append((pos,
1184 super(unbundlepart, self).tell()))
1164 self._chunkindex.append((pos, self._tellfp()))
1185 1165 yield result
1186 1166 payloadsize = self._unpack(_fpayloadsize)[0]
1187 1167 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1188 1168
1189 1169 def _findchunk(self, pos):
1190 1170 '''for a given payload position, return a chunk number and offset'''
1191 1171 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1192 1172 if ppos == pos:
1193 1173 return chunk, 0
1194 1174 elif ppos > pos:
1195 1175 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1196 1176 raise ValueError('Unknown chunk')
1197 1177
1198 1178 def _readheader(self):
1199 1179 """read the header and setup the object"""
1200 1180 typesize = self._unpackheader(_fparttypesize)[0]
1201 1181 self.type = self._fromheader(typesize)
1202 1182 indebug(self.ui, 'part type: "%s"' % self.type)
1203 1183 self.id = self._unpackheader(_fpartid)[0]
1204 1184 indebug(self.ui, 'part id: "%s"' % self.id)
1205 1185 # extract mandatory bit from type
1206 1186 self.mandatory = (self.type != self.type.lower())
1207 1187 self.type = self.type.lower()
1208 1188 ## reading parameters
1209 1189 # param count
1210 1190 mancount, advcount = self._unpackheader(_fpartparamcount)
1211 1191 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1212 1192 # param size
1213 1193 fparamsizes = _makefpartparamsizes(mancount + advcount)
1214 1194 paramsizes = self._unpackheader(fparamsizes)
1215 1195 # make it a list of couple again
1216 1196 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1217 1197 # split mandatory from advisory
1218 1198 mansizes = paramsizes[:mancount]
1219 1199 advsizes = paramsizes[mancount:]
1220 1200 # retrieve param value
1221 1201 manparams = []
1222 1202 for key, value in mansizes:
1223 1203 manparams.append((self._fromheader(key), self._fromheader(value)))
1224 1204 advparams = []
1225 1205 for key, value in advsizes:
1226 1206 advparams.append((self._fromheader(key), self._fromheader(value)))
1227 1207 self._initparams(manparams, advparams)
1228 1208 ## part payload
1229 1209 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1230 1210 # we read the data, tell it
1231 1211 self._initialized = True
1232 1212
1233 1213 def read(self, size=None):
1234 1214 """read payload data"""
1235 1215 if not self._initialized:
1236 1216 self._readheader()
1237 1217 if size is None:
1238 1218 data = self._payloadstream.read()
1239 1219 else:
1240 1220 data = self._payloadstream.read(size)
1241 1221 self._pos += len(data)
1242 1222 if size is None or len(data) < size:
1243 1223 if not self.consumed and self._pos:
1244 1224 self.ui.debug('bundle2-input-part: total payload size %i\n'
1245 1225 % self._pos)
1246 1226 self.consumed = True
1247 1227 return data
1248 1228
1249 1229 def tell(self):
1250 1230 return self._pos
1251 1231
1252 1232 def seek(self, offset, whence=0):
1253 1233 if whence == 0:
1254 1234 newpos = offset
1255 1235 elif whence == 1:
1256 1236 newpos = self._pos + offset
1257 1237 elif whence == 2:
1258 1238 if not self.consumed:
1259 1239 self.read()
1260 1240 newpos = self._chunkindex[-1][0] - offset
1261 1241 else:
1262 1242 raise ValueError('Unknown whence value: %r' % (whence,))
1263 1243
1264 1244 if newpos > self._chunkindex[-1][0] and not self.consumed:
1265 1245 self.read()
1266 1246 if not 0 <= newpos <= self._chunkindex[-1][0]:
1267 1247 raise ValueError('Offset out of range')
1268 1248
1269 1249 if self._pos != newpos:
1270 1250 chunk, internaloffset = self._findchunk(newpos)
1271 1251 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1272 1252 adjust = self.read(internaloffset)
1273 1253 if len(adjust) != internaloffset:
1274 1254 raise error.Abort(_('Seek failed\n'))
1275 1255 self._pos = newpos
1276 1256
1257 def _seekfp(self, offset, whence=0):
1258 """move the underlying file pointer
1259
1260 This method is meant for internal usage by the bundle2 protocol only.
1261 They directly manipulate the low level stream including bundle2 level
1262 instruction.
1263
1264 Do not use it to implement higher-level logic or methods."""
1265 if self._seekable:
1266 return self._fp.seek(offset, whence)
1267 else:
1268 raise NotImplementedError(_('File pointer is not seekable'))
1269
1270 def _tellfp(self):
1271 """return the file offset, or None if file is not seekable
1272
1273 This method is meant for internal usage by the bundle2 protocol only.
1274 They directly manipulate the low level stream including bundle2 level
1275 instruction.
1276
1277 Do not use it to implement higher-level logic or methods."""
1278 if self._seekable:
1279 try:
1280 return self._fp.tell()
1281 except IOError as e:
1282 if e.errno == errno.ESPIPE:
1283 self._seekable = False
1284 else:
1285 raise
1286 return None
1287
1277 1288 # These are only the static capabilities.
1278 1289 # Check the 'getrepocaps' function for the rest.
1279 1290 capabilities = {'HG20': (),
1280 1291 'error': ('abort', 'unsupportedcontent', 'pushraced',
1281 1292 'pushkey'),
1282 1293 'listkeys': (),
1283 1294 'pushkey': (),
1284 1295 'digests': tuple(sorted(util.DIGESTS.keys())),
1285 1296 'remote-changegroup': ('http', 'https'),
1286 1297 'hgtagsfnodes': (),
1287 1298 }
1288 1299
1289 1300 def getrepocaps(repo, allowpushback=False):
1290 1301 """return the bundle2 capabilities for a given repo
1291 1302
1292 1303 Exists to allow extensions (like evolution) to mutate the capabilities.
1293 1304 """
1294 1305 caps = capabilities.copy()
1295 1306 caps['changegroup'] = tuple(sorted(
1296 1307 changegroup.supportedincomingversions(repo)))
1297 1308 if obsolete.isenabled(repo, obsolete.exchangeopt):
1298 1309 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1299 1310 caps['obsmarkers'] = supportedformat
1300 1311 if allowpushback:
1301 1312 caps['pushback'] = ()
1302 1313 return caps
1303 1314
1304 1315 def bundle2caps(remote):
1305 1316 """return the bundle capabilities of a peer as dict"""
1306 1317 raw = remote.capable('bundle2')
1307 1318 if not raw and raw != '':
1308 1319 return {}
1309 1320 capsblob = urlreq.unquote(remote.capable('bundle2'))
1310 1321 return decodecaps(capsblob)
1311 1322
1312 1323 def obsmarkersversion(caps):
1313 1324 """extract the list of supported obsmarkers versions from a bundle2caps dict
1314 1325 """
1315 1326 obscaps = caps.get('obsmarkers', ())
1316 1327 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1317 1328
1318 1329 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1319 1330 compopts=None):
1320 1331 """Write a bundle file and return its filename.
1321 1332
1322 1333 Existing files will not be overwritten.
1323 1334 If no filename is specified, a temporary file is created.
1324 1335 bz2 compression can be turned off.
1325 1336 The bundle file will be deleted in case of errors.
1326 1337 """
1327 1338
1328 1339 if bundletype == "HG20":
1329 1340 bundle = bundle20(ui)
1330 1341 bundle.setcompression(compression, compopts)
1331 1342 part = bundle.newpart('changegroup', data=cg.getchunks())
1332 1343 part.addparam('version', cg.version)
1333 1344 if 'clcount' in cg.extras:
1334 1345 part.addparam('nbchanges', str(cg.extras['clcount']),
1335 1346 mandatory=False)
1336 1347 chunkiter = bundle.getchunks()
1337 1348 else:
1338 1349 # compression argument is only for the bundle2 case
1339 1350 assert compression is None
1340 1351 if cg.version != '01':
1341 1352 raise error.Abort(_('old bundle types only supports v1 '
1342 1353 'changegroups'))
1343 1354 header, comp = bundletypes[bundletype]
1344 1355 if comp not in util.compengines.supportedbundletypes:
1345 1356 raise error.Abort(_('unknown stream compression type: %s')
1346 1357 % comp)
1347 1358 compengine = util.compengines.forbundletype(comp)
1348 1359 def chunkiter():
1349 1360 yield header
1350 1361 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1351 1362 yield chunk
1352 1363 chunkiter = chunkiter()
1353 1364
1354 1365 # parse the changegroup data, otherwise we will block
1355 1366 # in case of sshrepo because we don't know the end of the stream
1356 1367 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1357 1368
1358 1369 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1359 1370 def handlechangegroup(op, inpart):
1360 1371 """apply a changegroup part on the repo
1361 1372
1362 1373 This is a very early implementation that will massive rework before being
1363 1374 inflicted to any end-user.
1364 1375 """
1365 1376 # Make sure we trigger a transaction creation
1366 1377 #
1367 1378 # The addchangegroup function will get a transaction object by itself, but
1368 1379 # we need to make sure we trigger the creation of a transaction object used
1369 1380 # for the whole processing scope.
1370 1381 op.gettransaction()
1371 1382 unpackerversion = inpart.params.get('version', '01')
1372 1383 # We should raise an appropriate exception here
1373 1384 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1374 1385 # the source and url passed here are overwritten by the one contained in
1375 1386 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1376 1387 nbchangesets = None
1377 1388 if 'nbchanges' in inpart.params:
1378 1389 nbchangesets = int(inpart.params.get('nbchanges'))
1379 1390 if ('treemanifest' in inpart.params and
1380 1391 'treemanifest' not in op.repo.requirements):
1381 1392 if len(op.repo.changelog) != 0:
1382 1393 raise error.Abort(_(
1383 1394 "bundle contains tree manifests, but local repo is "
1384 1395 "non-empty and does not use tree manifests"))
1385 1396 op.repo.requirements.add('treemanifest')
1386 1397 op.repo._applyopenerreqs()
1387 1398 op.repo._writerequirements()
1388 1399 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1389 1400 op.records.add('changegroup', {'return': ret})
1390 1401 if op.reply is not None:
1391 1402 # This is definitely not the final form of this
1392 1403 # return. But one need to start somewhere.
1393 1404 part = op.reply.newpart('reply:changegroup', mandatory=False)
1394 1405 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1395 1406 part.addparam('return', '%i' % ret, mandatory=False)
1396 1407 assert not inpart.read()
1397 1408
1398 1409 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1399 1410 ['digest:%s' % k for k in util.DIGESTS.keys()])
1400 1411 @parthandler('remote-changegroup', _remotechangegroupparams)
1401 1412 def handleremotechangegroup(op, inpart):
1402 1413 """apply a bundle10 on the repo, given an url and validation information
1403 1414
1404 1415 All the information about the remote bundle to import are given as
1405 1416 parameters. The parameters include:
1406 1417 - url: the url to the bundle10.
1407 1418 - size: the bundle10 file size. It is used to validate what was
1408 1419 retrieved by the client matches the server knowledge about the bundle.
1409 1420 - digests: a space separated list of the digest types provided as
1410 1421 parameters.
1411 1422 - digest:<digest-type>: the hexadecimal representation of the digest with
1412 1423 that name. Like the size, it is used to validate what was retrieved by
1413 1424 the client matches what the server knows about the bundle.
1414 1425
1415 1426 When multiple digest types are given, all of them are checked.
1416 1427 """
1417 1428 try:
1418 1429 raw_url = inpart.params['url']
1419 1430 except KeyError:
1420 1431 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1421 1432 parsed_url = util.url(raw_url)
1422 1433 if parsed_url.scheme not in capabilities['remote-changegroup']:
1423 1434 raise error.Abort(_('remote-changegroup does not support %s urls') %
1424 1435 parsed_url.scheme)
1425 1436
1426 1437 try:
1427 1438 size = int(inpart.params['size'])
1428 1439 except ValueError:
1429 1440 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1430 1441 % 'size')
1431 1442 except KeyError:
1432 1443 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1433 1444
1434 1445 digests = {}
1435 1446 for typ in inpart.params.get('digests', '').split():
1436 1447 param = 'digest:%s' % typ
1437 1448 try:
1438 1449 value = inpart.params[param]
1439 1450 except KeyError:
1440 1451 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1441 1452 param)
1442 1453 digests[typ] = value
1443 1454
1444 1455 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1445 1456
1446 1457 # Make sure we trigger a transaction creation
1447 1458 #
1448 1459 # The addchangegroup function will get a transaction object by itself, but
1449 1460 # we need to make sure we trigger the creation of a transaction object used
1450 1461 # for the whole processing scope.
1451 1462 op.gettransaction()
1452 1463 from . import exchange
1453 1464 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1454 1465 if not isinstance(cg, changegroup.cg1unpacker):
1455 1466 raise error.Abort(_('%s: not a bundle version 1.0') %
1456 1467 util.hidepassword(raw_url))
1457 1468 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1458 1469 op.records.add('changegroup', {'return': ret})
1459 1470 if op.reply is not None:
1460 1471 # This is definitely not the final form of this
1461 1472 # return. But one need to start somewhere.
1462 1473 part = op.reply.newpart('reply:changegroup')
1463 1474 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1464 1475 part.addparam('return', '%i' % ret, mandatory=False)
1465 1476 try:
1466 1477 real_part.validate()
1467 1478 except error.Abort as e:
1468 1479 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1469 1480 (util.hidepassword(raw_url), str(e)))
1470 1481 assert not inpart.read()
1471 1482
1472 1483 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1473 1484 def handlereplychangegroup(op, inpart):
1474 1485 ret = int(inpart.params['return'])
1475 1486 replyto = int(inpart.params['in-reply-to'])
1476 1487 op.records.add('changegroup', {'return': ret}, replyto)
1477 1488
1478 1489 @parthandler('check:heads')
1479 1490 def handlecheckheads(op, inpart):
1480 1491 """check that head of the repo did not change
1481 1492
1482 1493 This is used to detect a push race when using unbundle.
1483 1494 This replaces the "heads" argument of unbundle."""
1484 1495 h = inpart.read(20)
1485 1496 heads = []
1486 1497 while len(h) == 20:
1487 1498 heads.append(h)
1488 1499 h = inpart.read(20)
1489 1500 assert not h
1490 1501 # Trigger a transaction so that we are guaranteed to have the lock now.
1491 1502 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1492 1503 op.gettransaction()
1493 1504 if sorted(heads) != sorted(op.repo.heads()):
1494 1505 raise error.PushRaced('repository changed while pushing - '
1495 1506 'please try again')
1496 1507
1497 1508 @parthandler('output')
1498 1509 def handleoutput(op, inpart):
1499 1510 """forward output captured on the server to the client"""
1500 1511 for line in inpart.read().splitlines():
1501 1512 op.ui.status(_('remote: %s\n') % line)
1502 1513
1503 1514 @parthandler('replycaps')
1504 1515 def handlereplycaps(op, inpart):
1505 1516 """Notify that a reply bundle should be created
1506 1517
1507 1518 The payload contains the capabilities information for the reply"""
1508 1519 caps = decodecaps(inpart.read())
1509 1520 if op.reply is None:
1510 1521 op.reply = bundle20(op.ui, caps)
1511 1522
1512 1523 class AbortFromPart(error.Abort):
1513 1524 """Sub-class of Abort that denotes an error from a bundle2 part."""
1514 1525
1515 1526 @parthandler('error:abort', ('message', 'hint'))
1516 1527 def handleerrorabort(op, inpart):
1517 1528 """Used to transmit abort error over the wire"""
1518 1529 raise AbortFromPart(inpart.params['message'],
1519 1530 hint=inpart.params.get('hint'))
1520 1531
1521 1532 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1522 1533 'in-reply-to'))
1523 1534 def handleerrorpushkey(op, inpart):
1524 1535 """Used to transmit failure of a mandatory pushkey over the wire"""
1525 1536 kwargs = {}
1526 1537 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1527 1538 value = inpart.params.get(name)
1528 1539 if value is not None:
1529 1540 kwargs[name] = value
1530 1541 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1531 1542
1532 1543 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1533 1544 def handleerrorunsupportedcontent(op, inpart):
1534 1545 """Used to transmit unknown content error over the wire"""
1535 1546 kwargs = {}
1536 1547 parttype = inpart.params.get('parttype')
1537 1548 if parttype is not None:
1538 1549 kwargs['parttype'] = parttype
1539 1550 params = inpart.params.get('params')
1540 1551 if params is not None:
1541 1552 kwargs['params'] = params.split('\0')
1542 1553
1543 1554 raise error.BundleUnknownFeatureError(**kwargs)
1544 1555
1545 1556 @parthandler('error:pushraced', ('message',))
1546 1557 def handleerrorpushraced(op, inpart):
1547 1558 """Used to transmit push race error over the wire"""
1548 1559 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1549 1560
1550 1561 @parthandler('listkeys', ('namespace',))
1551 1562 def handlelistkeys(op, inpart):
1552 1563 """retrieve pushkey namespace content stored in a bundle2"""
1553 1564 namespace = inpart.params['namespace']
1554 1565 r = pushkey.decodekeys(inpart.read())
1555 1566 op.records.add('listkeys', (namespace, r))
1556 1567
1557 1568 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1558 1569 def handlepushkey(op, inpart):
1559 1570 """process a pushkey request"""
1560 1571 dec = pushkey.decode
1561 1572 namespace = dec(inpart.params['namespace'])
1562 1573 key = dec(inpart.params['key'])
1563 1574 old = dec(inpart.params['old'])
1564 1575 new = dec(inpart.params['new'])
1565 1576 # Grab the transaction to ensure that we have the lock before performing the
1566 1577 # pushkey.
1567 1578 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1568 1579 op.gettransaction()
1569 1580 ret = op.repo.pushkey(namespace, key, old, new)
1570 1581 record = {'namespace': namespace,
1571 1582 'key': key,
1572 1583 'old': old,
1573 1584 'new': new}
1574 1585 op.records.add('pushkey', record)
1575 1586 if op.reply is not None:
1576 1587 rpart = op.reply.newpart('reply:pushkey')
1577 1588 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1578 1589 rpart.addparam('return', '%i' % ret, mandatory=False)
1579 1590 if inpart.mandatory and not ret:
1580 1591 kwargs = {}
1581 1592 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1582 1593 if key in inpart.params:
1583 1594 kwargs[key] = inpart.params[key]
1584 1595 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1585 1596
1586 1597 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1587 1598 def handlepushkeyreply(op, inpart):
1588 1599 """retrieve the result of a pushkey request"""
1589 1600 ret = int(inpart.params['return'])
1590 1601 partid = int(inpart.params['in-reply-to'])
1591 1602 op.records.add('pushkey', {'return': ret}, partid)
1592 1603
1593 1604 @parthandler('obsmarkers')
1594 1605 def handleobsmarker(op, inpart):
1595 1606 """add a stream of obsmarkers to the repo"""
1596 1607 tr = op.gettransaction()
1597 1608 markerdata = inpart.read()
1598 1609 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1599 1610 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1600 1611 % len(markerdata))
1601 1612 # The mergemarkers call will crash if marker creation is not enabled.
1602 1613 # we want to avoid this if the part is advisory.
1603 1614 if not inpart.mandatory and op.repo.obsstore.readonly:
1604 1615 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1605 1616 return
1606 1617 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1607 1618 if new:
1608 1619 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1609 1620 op.records.add('obsmarkers', {'new': new})
1610 1621 if op.reply is not None:
1611 1622 rpart = op.reply.newpart('reply:obsmarkers')
1612 1623 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1613 1624 rpart.addparam('new', '%i' % new, mandatory=False)
1614 1625
1615 1626
1616 1627 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1617 1628 def handleobsmarkerreply(op, inpart):
1618 1629 """retrieve the result of a pushkey request"""
1619 1630 ret = int(inpart.params['new'])
1620 1631 partid = int(inpart.params['in-reply-to'])
1621 1632 op.records.add('obsmarkers', {'new': ret}, partid)
1622 1633
1623 1634 @parthandler('hgtagsfnodes')
1624 1635 def handlehgtagsfnodes(op, inpart):
1625 1636 """Applies .hgtags fnodes cache entries to the local repo.
1626 1637
1627 1638 Payload is pairs of 20 byte changeset nodes and filenodes.
1628 1639 """
1629 1640 # Grab the transaction so we ensure that we have the lock at this point.
1630 1641 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1631 1642 op.gettransaction()
1632 1643 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1633 1644
1634 1645 count = 0
1635 1646 while True:
1636 1647 node = inpart.read(20)
1637 1648 fnode = inpart.read(20)
1638 1649 if len(node) < 20 or len(fnode) < 20:
1639 1650 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1640 1651 break
1641 1652 cache.setfnode(node, fnode)
1642 1653 count += 1
1643 1654
1644 1655 cache.write()
1645 1656 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
General Comments 0
You need to be logged in to leave comments. Login now