##// END OF EJS Templates
pycompat: extract helper to raise exception with traceback...
Yuya Nishihara -
r32186:76f9a000 default
parent child Browse files
Show More
@@ -1,1672 +1,1669 b''
1 1 # bundle2.py - generic container format to transmit arbitrary data.
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 """Handling of the new bundle2 format
8 8
9 9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 11 that will be handed to and processed by the application layer.
12 12
13 13
14 14 General format architecture
15 15 ===========================
16 16
17 17 The format is architectured as follow
18 18
19 19 - magic string
20 20 - stream level parameters
21 21 - payload parts (any number)
22 22 - end of stream marker.
23 23
24 24 the Binary format
25 25 ============================
26 26
27 27 All numbers are unsigned and big-endian.
28 28
29 29 stream level parameters
30 30 ------------------------
31 31
32 32 Binary format is as follow
33 33
34 34 :params size: int32
35 35
36 36 The total number of Bytes used by the parameters
37 37
38 38 :params value: arbitrary number of Bytes
39 39
40 40 A blob of `params size` containing the serialized version of all stream level
41 41 parameters.
42 42
43 43 The blob contains a space separated list of parameters. Parameters with value
44 44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45 45
46 46 Empty name are obviously forbidden.
47 47
48 48 Name MUST start with a letter. If this first letter is lower case, the
49 49 parameter is advisory and can be safely ignored. However when the first
50 50 letter is capital, the parameter is mandatory and the bundling process MUST
51 51 stop if he is not able to proceed it.
52 52
53 53 Stream parameters use a simple textual format for two main reasons:
54 54
55 55 - Stream level parameters should remain simple and we want to discourage any
56 56 crazy usage.
57 57 - Textual data allow easy human inspection of a bundle2 header in case of
58 58 troubles.
59 59
60 60 Any Applicative level options MUST go into a bundle2 part instead.
61 61
62 62 Payload part
63 63 ------------------------
64 64
65 65 Binary format is as follow
66 66
67 67 :header size: int32
68 68
69 69 The total number of Bytes used by the part header. When the header is empty
70 70 (size = 0) this is interpreted as the end of stream marker.
71 71
72 72 :header:
73 73
74 74 The header defines how to interpret the part. It contains two piece of
75 75 data: the part type, and the part parameters.
76 76
77 77 The part type is used to route an application level handler, that can
78 78 interpret payload.
79 79
80 80 Part parameters are passed to the application level handler. They are
81 81 meant to convey information that will help the application level object to
82 82 interpret the part payload.
83 83
84 84 The binary format of the header is has follow
85 85
86 86 :typesize: (one byte)
87 87
88 88 :parttype: alphanumerical part name (restricted to [a-zA-Z0-9_:-]*)
89 89
90 90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 91 to this part.
92 92
93 93 :parameters:
94 94
95 95 Part's parameter may have arbitrary content, the binary structure is::
96 96
97 97 <mandatory-count><advisory-count><param-sizes><param-data>
98 98
99 99 :mandatory-count: 1 byte, number of mandatory parameters
100 100
101 101 :advisory-count: 1 byte, number of advisory parameters
102 102
103 103 :param-sizes:
104 104
105 105 N couple of bytes, where N is the total number of parameters. Each
106 106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107 107
108 108 :param-data:
109 109
110 110 A blob of bytes from which each parameter key and value can be
111 111 retrieved using the list of size couples stored in the previous
112 112 field.
113 113
114 114 Mandatory parameters comes first, then the advisory ones.
115 115
116 116 Each parameter's key MUST be unique within the part.
117 117
118 118 :payload:
119 119
120 120 payload is a series of `<chunksize><chunkdata>`.
121 121
122 122 `chunksize` is an int32, `chunkdata` are plain bytes (as much as
123 123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124 124
125 125 The current implementation always produces either zero or one chunk.
126 126 This is an implementation limitation that will ultimately be lifted.
127 127
128 128 `chunksize` can be negative to trigger special case processing. No such
129 129 processing is in place yet.
130 130
131 131 Bundle processing
132 132 ============================
133 133
134 134 Each part is processed in order using a "part handler". Handler are registered
135 135 for a certain part type.
136 136
137 137 The matching of a part to its handler is case insensitive. The case of the
138 138 part type is used to know if a part is mandatory or advisory. If the Part type
139 139 contains any uppercase char it is considered mandatory. When no handler is
140 140 known for a Mandatory part, the process is aborted and an exception is raised.
141 141 If the part is advisory and no handler is known, the part is ignored. When the
142 142 process is aborted, the full bundle is still read from the stream to keep the
143 143 channel usable. But none of the part read from an abort are processed. In the
144 144 future, dropping the stream may become an option for channel we do not care to
145 145 preserve.
146 146 """
147 147
148 148 from __future__ import absolute_import
149 149
150 150 import errno
151 151 import re
152 152 import string
153 153 import struct
154 154 import sys
155 155
156 156 from .i18n import _
157 157 from . import (
158 158 changegroup,
159 159 error,
160 160 obsolete,
161 161 pushkey,
162 162 pycompat,
163 163 tags,
164 164 url,
165 165 util,
166 166 )
167 167
168 168 urlerr = util.urlerr
169 169 urlreq = util.urlreq
170 170
171 171 _pack = struct.pack
172 172 _unpack = struct.unpack
173 173
174 174 _fstreamparamsize = '>i'
175 175 _fpartheadersize = '>i'
176 176 _fparttypesize = '>B'
177 177 _fpartid = '>I'
178 178 _fpayloadsize = '>i'
179 179 _fpartparamcount = '>BB'
180 180
181 181 preferedchunksize = 4096
182 182
183 183 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
184 184
185 185 def outdebug(ui, message):
186 186 """debug regarding output stream (bundling)"""
187 187 if ui.configbool('devel', 'bundle2.debug', False):
188 188 ui.debug('bundle2-output: %s\n' % message)
189 189
190 190 def indebug(ui, message):
191 191 """debug on input stream (unbundling)"""
192 192 if ui.configbool('devel', 'bundle2.debug', False):
193 193 ui.debug('bundle2-input: %s\n' % message)
194 194
195 195 def validateparttype(parttype):
196 196 """raise ValueError if a parttype contains invalid character"""
197 197 if _parttypeforbidden.search(parttype):
198 198 raise ValueError(parttype)
199 199
200 200 def _makefpartparamsizes(nbparams):
201 201 """return a struct format to read part parameter sizes
202 202
203 203 The number parameters is variable so we need to build that format
204 204 dynamically.
205 205 """
206 206 return '>'+('BB'*nbparams)
207 207
208 208 parthandlermapping = {}
209 209
210 210 def parthandler(parttype, params=()):
211 211 """decorator that register a function as a bundle2 part handler
212 212
213 213 eg::
214 214
215 215 @parthandler('myparttype', ('mandatory', 'param', 'handled'))
216 216 def myparttypehandler(...):
217 217 '''process a part of type "my part".'''
218 218 ...
219 219 """
220 220 validateparttype(parttype)
221 221 def _decorator(func):
222 222 lparttype = parttype.lower() # enforce lower case matching.
223 223 assert lparttype not in parthandlermapping
224 224 parthandlermapping[lparttype] = func
225 225 func.params = frozenset(params)
226 226 return func
227 227 return _decorator
228 228
229 229 class unbundlerecords(object):
230 230 """keep record of what happens during and unbundle
231 231
232 232 New records are added using `records.add('cat', obj)`. Where 'cat' is a
233 233 category of record and obj is an arbitrary object.
234 234
235 235 `records['cat']` will return all entries of this category 'cat'.
236 236
237 237 Iterating on the object itself will yield `('category', obj)` tuples
238 238 for all entries.
239 239
240 240 All iterations happens in chronological order.
241 241 """
242 242
243 243 def __init__(self):
244 244 self._categories = {}
245 245 self._sequences = []
246 246 self._replies = {}
247 247
248 248 def add(self, category, entry, inreplyto=None):
249 249 """add a new record of a given category.
250 250
251 251 The entry can then be retrieved in the list returned by
252 252 self['category']."""
253 253 self._categories.setdefault(category, []).append(entry)
254 254 self._sequences.append((category, entry))
255 255 if inreplyto is not None:
256 256 self.getreplies(inreplyto).add(category, entry)
257 257
258 258 def getreplies(self, partid):
259 259 """get the records that are replies to a specific part"""
260 260 return self._replies.setdefault(partid, unbundlerecords())
261 261
262 262 def __getitem__(self, cat):
263 263 return tuple(self._categories.get(cat, ()))
264 264
265 265 def __iter__(self):
266 266 return iter(self._sequences)
267 267
268 268 def __len__(self):
269 269 return len(self._sequences)
270 270
271 271 def __nonzero__(self):
272 272 return bool(self._sequences)
273 273
274 274 __bool__ = __nonzero__
275 275
276 276 class bundleoperation(object):
277 277 """an object that represents a single bundling process
278 278
279 279 Its purpose is to carry unbundle-related objects and states.
280 280
281 281 A new object should be created at the beginning of each bundle processing.
282 282 The object is to be returned by the processing function.
283 283
284 284 The object has very little content now it will ultimately contain:
285 285 * an access to the repo the bundle is applied to,
286 286 * a ui object,
287 287 * a way to retrieve a transaction to add changes to the repo,
288 288 * a way to record the result of processing each part,
289 289 * a way to construct a bundle response when applicable.
290 290 """
291 291
292 292 def __init__(self, repo, transactiongetter, captureoutput=True):
293 293 self.repo = repo
294 294 self.ui = repo.ui
295 295 self.records = unbundlerecords()
296 296 self.gettransaction = transactiongetter
297 297 self.reply = None
298 298 self.captureoutput = captureoutput
299 299
300 300 class TransactionUnavailable(RuntimeError):
301 301 pass
302 302
303 303 def _notransaction():
304 304 """default method to get a transaction while processing a bundle
305 305
306 306 Raise an exception to highlight the fact that no transaction was expected
307 307 to be created"""
308 308 raise TransactionUnavailable()
309 309
310 310 def applybundle(repo, unbundler, tr, source=None, url=None, op=None):
311 311 # transform me into unbundler.apply() as soon as the freeze is lifted
312 312 tr.hookargs['bundle2'] = '1'
313 313 if source is not None and 'source' not in tr.hookargs:
314 314 tr.hookargs['source'] = source
315 315 if url is not None and 'url' not in tr.hookargs:
316 316 tr.hookargs['url'] = url
317 317 return processbundle(repo, unbundler, lambda: tr, op=op)
318 318
319 319 def processbundle(repo, unbundler, transactiongetter=None, op=None):
320 320 """This function process a bundle, apply effect to/from a repo
321 321
322 322 It iterates over each part then searches for and uses the proper handling
323 323 code to process the part. Parts are processed in order.
324 324
325 325 Unknown Mandatory part will abort the process.
326 326
327 327 It is temporarily possible to provide a prebuilt bundleoperation to the
328 328 function. This is used to ensure output is properly propagated in case of
329 329 an error during the unbundling. This output capturing part will likely be
330 330 reworked and this ability will probably go away in the process.
331 331 """
332 332 if op is None:
333 333 if transactiongetter is None:
334 334 transactiongetter = _notransaction
335 335 op = bundleoperation(repo, transactiongetter)
336 336 # todo:
337 337 # - replace this is a init function soon.
338 338 # - exception catching
339 339 unbundler.params
340 340 if repo.ui.debugflag:
341 341 msg = ['bundle2-input-bundle:']
342 342 if unbundler.params:
343 343 msg.append(' %i params')
344 344 if op.gettransaction is None:
345 345 msg.append(' no-transaction')
346 346 else:
347 347 msg.append(' with-transaction')
348 348 msg.append('\n')
349 349 repo.ui.debug(''.join(msg))
350 350 iterparts = enumerate(unbundler.iterparts())
351 351 part = None
352 352 nbpart = 0
353 353 try:
354 354 for nbpart, part in iterparts:
355 355 _processpart(op, part)
356 356 except Exception as exc:
357 357 # Any exceptions seeking to the end of the bundle at this point are
358 358 # almost certainly related to the underlying stream being bad.
359 359 # And, chances are that the exception we're handling is related to
360 360 # getting in that bad state. So, we swallow the seeking error and
361 361 # re-raise the original error.
362 362 seekerror = False
363 363 try:
364 364 for nbpart, part in iterparts:
365 365 # consume the bundle content
366 366 part.seek(0, 2)
367 367 except Exception:
368 368 seekerror = True
369 369
370 370 # Small hack to let caller code distinguish exceptions from bundle2
371 371 # processing from processing the old format. This is mostly
372 372 # needed to handle different return codes to unbundle according to the
373 373 # type of bundle. We should probably clean up or drop this return code
374 374 # craziness in a future version.
375 375 exc.duringunbundle2 = True
376 376 salvaged = []
377 377 replycaps = None
378 378 if op.reply is not None:
379 379 salvaged = op.reply.salvageoutput()
380 380 replycaps = op.reply.capabilities
381 381 exc._replycaps = replycaps
382 382 exc._bundle2salvagedoutput = salvaged
383 383
384 384 # Re-raising from a variable loses the original stack. So only use
385 385 # that form if we need to.
386 386 if seekerror:
387 387 raise exc
388 388 else:
389 389 raise
390 390 finally:
391 391 repo.ui.debug('bundle2-input-bundle: %i parts total\n' % nbpart)
392 392
393 393 return op
394 394
395 395 def _processpart(op, part):
396 396 """process a single part from a bundle
397 397
398 398 The part is guaranteed to have been fully consumed when the function exits
399 399 (even if an exception is raised)."""
400 400 status = 'unknown' # used by debug output
401 401 hardabort = False
402 402 try:
403 403 try:
404 404 handler = parthandlermapping.get(part.type)
405 405 if handler is None:
406 406 status = 'unsupported-type'
407 407 raise error.BundleUnknownFeatureError(parttype=part.type)
408 408 indebug(op.ui, 'found a handler for part %r' % part.type)
409 409 unknownparams = part.mandatorykeys - handler.params
410 410 if unknownparams:
411 411 unknownparams = list(unknownparams)
412 412 unknownparams.sort()
413 413 status = 'unsupported-params (%s)' % unknownparams
414 414 raise error.BundleUnknownFeatureError(parttype=part.type,
415 415 params=unknownparams)
416 416 status = 'supported'
417 417 except error.BundleUnknownFeatureError as exc:
418 418 if part.mandatory: # mandatory parts
419 419 raise
420 420 indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
421 421 return # skip to part processing
422 422 finally:
423 423 if op.ui.debugflag:
424 424 msg = ['bundle2-input-part: "%s"' % part.type]
425 425 if not part.mandatory:
426 426 msg.append(' (advisory)')
427 427 nbmp = len(part.mandatorykeys)
428 428 nbap = len(part.params) - nbmp
429 429 if nbmp or nbap:
430 430 msg.append(' (params:')
431 431 if nbmp:
432 432 msg.append(' %i mandatory' % nbmp)
433 433 if nbap:
434 434 msg.append(' %i advisory' % nbmp)
435 435 msg.append(')')
436 436 msg.append(' %s\n' % status)
437 437 op.ui.debug(''.join(msg))
438 438
439 439 # handler is called outside the above try block so that we don't
440 440 # risk catching KeyErrors from anything other than the
441 441 # parthandlermapping lookup (any KeyError raised by handler()
442 442 # itself represents a defect of a different variety).
443 443 output = None
444 444 if op.captureoutput and op.reply is not None:
445 445 op.ui.pushbuffer(error=True, subproc=True)
446 446 output = ''
447 447 try:
448 448 handler(op, part)
449 449 finally:
450 450 if output is not None:
451 451 output = op.ui.popbuffer()
452 452 if output:
453 453 outpart = op.reply.newpart('output', data=output,
454 454 mandatory=False)
455 455 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
456 456 # If exiting or interrupted, do not attempt to seek the stream in the
457 457 # finally block below. This makes abort faster.
458 458 except (SystemExit, KeyboardInterrupt):
459 459 hardabort = True
460 460 raise
461 461 finally:
462 462 # consume the part content to not corrupt the stream.
463 463 if not hardabort:
464 464 part.seek(0, 2)
465 465
466 466
467 467 def decodecaps(blob):
468 468 """decode a bundle2 caps bytes blob into a dictionary
469 469
470 470 The blob is a list of capabilities (one per line)
471 471 Capabilities may have values using a line of the form::
472 472
473 473 capability=value1,value2,value3
474 474
475 475 The values are always a list."""
476 476 caps = {}
477 477 for line in blob.splitlines():
478 478 if not line:
479 479 continue
480 480 if '=' not in line:
481 481 key, vals = line, ()
482 482 else:
483 483 key, vals = line.split('=', 1)
484 484 vals = vals.split(',')
485 485 key = urlreq.unquote(key)
486 486 vals = [urlreq.unquote(v) for v in vals]
487 487 caps[key] = vals
488 488 return caps
489 489
490 490 def encodecaps(caps):
491 491 """encode a bundle2 caps dictionary into a bytes blob"""
492 492 chunks = []
493 493 for ca in sorted(caps):
494 494 vals = caps[ca]
495 495 ca = urlreq.quote(ca)
496 496 vals = [urlreq.quote(v) for v in vals]
497 497 if vals:
498 498 ca = "%s=%s" % (ca, ','.join(vals))
499 499 chunks.append(ca)
500 500 return '\n'.join(chunks)
501 501
502 502 bundletypes = {
503 503 "": ("", 'UN'), # only when using unbundle on ssh and old http servers
504 504 # since the unification ssh accepts a header but there
505 505 # is no capability signaling it.
506 506 "HG20": (), # special-cased below
507 507 "HG10UN": ("HG10UN", 'UN'),
508 508 "HG10BZ": ("HG10", 'BZ'),
509 509 "HG10GZ": ("HG10GZ", 'GZ'),
510 510 }
511 511
512 512 # hgweb uses this list to communicate its preferred type
513 513 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
514 514
515 515 class bundle20(object):
516 516 """represent an outgoing bundle2 container
517 517
518 518 Use the `addparam` method to add stream level parameter. and `newpart` to
519 519 populate it. Then call `getchunks` to retrieve all the binary chunks of
520 520 data that compose the bundle2 container."""
521 521
522 522 _magicstring = 'HG20'
523 523
524 524 def __init__(self, ui, capabilities=()):
525 525 self.ui = ui
526 526 self._params = []
527 527 self._parts = []
528 528 self.capabilities = dict(capabilities)
529 529 self._compengine = util.compengines.forbundletype('UN')
530 530 self._compopts = None
531 531
532 532 def setcompression(self, alg, compopts=None):
533 533 """setup core part compression to <alg>"""
534 534 if alg in (None, 'UN'):
535 535 return
536 536 assert not any(n.lower() == 'compression' for n, v in self._params)
537 537 self.addparam('Compression', alg)
538 538 self._compengine = util.compengines.forbundletype(alg)
539 539 self._compopts = compopts
540 540
541 541 @property
542 542 def nbparts(self):
543 543 """total number of parts added to the bundler"""
544 544 return len(self._parts)
545 545
546 546 # methods used to defines the bundle2 content
547 547 def addparam(self, name, value=None):
548 548 """add a stream level parameter"""
549 549 if not name:
550 550 raise ValueError('empty parameter name')
551 551 if name[0] not in string.letters:
552 552 raise ValueError('non letter first character: %r' % name)
553 553 self._params.append((name, value))
554 554
555 555 def addpart(self, part):
556 556 """add a new part to the bundle2 container
557 557
558 558 Parts contains the actual applicative payload."""
559 559 assert part.id is None
560 560 part.id = len(self._parts) # very cheap counter
561 561 self._parts.append(part)
562 562
563 563 def newpart(self, typeid, *args, **kwargs):
564 564 """create a new part and add it to the containers
565 565
566 566 As the part is directly added to the containers. For now, this means
567 567 that any failure to properly initialize the part after calling
568 568 ``newpart`` should result in a failure of the whole bundling process.
569 569
570 570 You can still fall back to manually create and add if you need better
571 571 control."""
572 572 part = bundlepart(typeid, *args, **kwargs)
573 573 self.addpart(part)
574 574 return part
575 575
576 576 # methods used to generate the bundle2 stream
577 577 def getchunks(self):
578 578 if self.ui.debugflag:
579 579 msg = ['bundle2-output-bundle: "%s",' % self._magicstring]
580 580 if self._params:
581 581 msg.append(' (%i params)' % len(self._params))
582 582 msg.append(' %i parts total\n' % len(self._parts))
583 583 self.ui.debug(''.join(msg))
584 584 outdebug(self.ui, 'start emission of %s stream' % self._magicstring)
585 585 yield self._magicstring
586 586 param = self._paramchunk()
587 587 outdebug(self.ui, 'bundle parameter: %s' % param)
588 588 yield _pack(_fstreamparamsize, len(param))
589 589 if param:
590 590 yield param
591 591 for chunk in self._compengine.compressstream(self._getcorechunk(),
592 592 self._compopts):
593 593 yield chunk
594 594
595 595 def _paramchunk(self):
596 596 """return a encoded version of all stream parameters"""
597 597 blocks = []
598 598 for par, value in self._params:
599 599 par = urlreq.quote(par)
600 600 if value is not None:
601 601 value = urlreq.quote(value)
602 602 par = '%s=%s' % (par, value)
603 603 blocks.append(par)
604 604 return ' '.join(blocks)
605 605
606 606 def _getcorechunk(self):
607 607 """yield chunk for the core part of the bundle
608 608
609 609 (all but headers and parameters)"""
610 610 outdebug(self.ui, 'start of parts')
611 611 for part in self._parts:
612 612 outdebug(self.ui, 'bundle part: "%s"' % part.type)
613 613 for chunk in part.getchunks(ui=self.ui):
614 614 yield chunk
615 615 outdebug(self.ui, 'end of bundle')
616 616 yield _pack(_fpartheadersize, 0)
617 617
618 618
619 619 def salvageoutput(self):
620 620 """return a list with a copy of all output parts in the bundle
621 621
622 622 This is meant to be used during error handling to make sure we preserve
623 623 server output"""
624 624 salvaged = []
625 625 for part in self._parts:
626 626 if part.type.startswith('output'):
627 627 salvaged.append(part.copy())
628 628 return salvaged
629 629
630 630
631 631 class unpackermixin(object):
632 632 """A mixin to extract bytes and struct data from a stream"""
633 633
634 634 def __init__(self, fp):
635 635 self._fp = fp
636 636
637 637 def _unpack(self, format):
638 638 """unpack this struct format from the stream
639 639
640 640 This method is meant for internal usage by the bundle2 protocol only.
641 641 They directly manipulate the low level stream including bundle2 level
642 642 instruction.
643 643
644 644 Do not use it to implement higher-level logic or methods."""
645 645 data = self._readexact(struct.calcsize(format))
646 646 return _unpack(format, data)
647 647
648 648 def _readexact(self, size):
649 649 """read exactly <size> bytes from the stream
650 650
651 651 This method is meant for internal usage by the bundle2 protocol only.
652 652 They directly manipulate the low level stream including bundle2 level
653 653 instruction.
654 654
655 655 Do not use it to implement higher-level logic or methods."""
656 656 return changegroup.readexactly(self._fp, size)
657 657
658 658 def getunbundler(ui, fp, magicstring=None):
659 659 """return a valid unbundler object for a given magicstring"""
660 660 if magicstring is None:
661 661 magicstring = changegroup.readexactly(fp, 4)
662 662 magic, version = magicstring[0:2], magicstring[2:4]
663 663 if magic != 'HG':
664 664 raise error.Abort(_('not a Mercurial bundle'))
665 665 unbundlerclass = formatmap.get(version)
666 666 if unbundlerclass is None:
667 667 raise error.Abort(_('unknown bundle version %s') % version)
668 668 unbundler = unbundlerclass(ui, fp)
669 669 indebug(ui, 'start processing of %s stream' % magicstring)
670 670 return unbundler
671 671
672 672 class unbundle20(unpackermixin):
673 673 """interpret a bundle2 stream
674 674
675 675 This class is fed with a binary stream and yields parts through its
676 676 `iterparts` methods."""
677 677
678 678 _magicstring = 'HG20'
679 679
680 680 def __init__(self, ui, fp):
681 681 """If header is specified, we do not read it out of the stream."""
682 682 self.ui = ui
683 683 self._compengine = util.compengines.forbundletype('UN')
684 684 self._compressed = None
685 685 super(unbundle20, self).__init__(fp)
686 686
687 687 @util.propertycache
688 688 def params(self):
689 689 """dictionary of stream level parameters"""
690 690 indebug(self.ui, 'reading bundle2 stream parameters')
691 691 params = {}
692 692 paramssize = self._unpack(_fstreamparamsize)[0]
693 693 if paramssize < 0:
694 694 raise error.BundleValueError('negative bundle param size: %i'
695 695 % paramssize)
696 696 if paramssize:
697 697 params = self._readexact(paramssize)
698 698 params = self._processallparams(params)
699 699 return params
700 700
701 701 def _processallparams(self, paramsblock):
702 702 """"""
703 703 params = util.sortdict()
704 704 for p in paramsblock.split(' '):
705 705 p = p.split('=', 1)
706 706 p = [urlreq.unquote(i) for i in p]
707 707 if len(p) < 2:
708 708 p.append(None)
709 709 self._processparam(*p)
710 710 params[p[0]] = p[1]
711 711 return params
712 712
713 713
714 714 def _processparam(self, name, value):
715 715 """process a parameter, applying its effect if needed
716 716
717 717 Parameter starting with a lower case letter are advisory and will be
718 718 ignored when unknown. Those starting with an upper case letter are
719 719 mandatory and will this function will raise a KeyError when unknown.
720 720
721 721 Note: no option are currently supported. Any input will be either
722 722 ignored or failing.
723 723 """
724 724 if not name:
725 725 raise ValueError('empty parameter name')
726 726 if name[0] not in string.letters:
727 727 raise ValueError('non letter first character: %r' % name)
728 728 try:
729 729 handler = b2streamparamsmap[name.lower()]
730 730 except KeyError:
731 731 if name[0].islower():
732 732 indebug(self.ui, "ignoring unknown parameter %r" % name)
733 733 else:
734 734 raise error.BundleUnknownFeatureError(params=(name,))
735 735 else:
736 736 handler(self, name, value)
737 737
738 738 def _forwardchunks(self):
739 739 """utility to transfer a bundle2 as binary
740 740
741 741 This is made necessary by the fact the 'getbundle' command over 'ssh'
742 742 have no way to know then the reply end, relying on the bundle to be
743 743 interpreted to know its end. This is terrible and we are sorry, but we
744 744 needed to move forward to get general delta enabled.
745 745 """
746 746 yield self._magicstring
747 747 assert 'params' not in vars(self)
748 748 paramssize = self._unpack(_fstreamparamsize)[0]
749 749 if paramssize < 0:
750 750 raise error.BundleValueError('negative bundle param size: %i'
751 751 % paramssize)
752 752 yield _pack(_fstreamparamsize, paramssize)
753 753 if paramssize:
754 754 params = self._readexact(paramssize)
755 755 self._processallparams(params)
756 756 yield params
757 757 assert self._compengine.bundletype == 'UN'
758 758 # From there, payload might need to be decompressed
759 759 self._fp = self._compengine.decompressorreader(self._fp)
760 760 emptycount = 0
761 761 while emptycount < 2:
762 762 # so we can brainlessly loop
763 763 assert _fpartheadersize == _fpayloadsize
764 764 size = self._unpack(_fpartheadersize)[0]
765 765 yield _pack(_fpartheadersize, size)
766 766 if size:
767 767 emptycount = 0
768 768 else:
769 769 emptycount += 1
770 770 continue
771 771 if size == flaginterrupt:
772 772 continue
773 773 elif size < 0:
774 774 raise error.BundleValueError('negative chunk size: %i')
775 775 yield self._readexact(size)
776 776
777 777
778 778 def iterparts(self):
779 779 """yield all parts contained in the stream"""
780 780 # make sure param have been loaded
781 781 self.params
782 782 # From there, payload need to be decompressed
783 783 self._fp = self._compengine.decompressorreader(self._fp)
784 784 indebug(self.ui, 'start extraction of bundle2 parts')
785 785 headerblock = self._readpartheader()
786 786 while headerblock is not None:
787 787 part = unbundlepart(self.ui, headerblock, self._fp)
788 788 yield part
789 789 part.seek(0, 2)
790 790 headerblock = self._readpartheader()
791 791 indebug(self.ui, 'end of bundle2 stream')
792 792
793 793 def _readpartheader(self):
794 794 """reads a part header size and return the bytes blob
795 795
796 796 returns None if empty"""
797 797 headersize = self._unpack(_fpartheadersize)[0]
798 798 if headersize < 0:
799 799 raise error.BundleValueError('negative part header size: %i'
800 800 % headersize)
801 801 indebug(self.ui, 'part header size: %i' % headersize)
802 802 if headersize:
803 803 return self._readexact(headersize)
804 804 return None
805 805
806 806 def compressed(self):
807 807 self.params # load params
808 808 return self._compressed
809 809
810 810 def close(self):
811 811 """close underlying file"""
812 812 if util.safehasattr(self._fp, 'close'):
813 813 return self._fp.close()
814 814
815 815 formatmap = {'20': unbundle20}
816 816
817 817 b2streamparamsmap = {}
818 818
819 819 def b2streamparamhandler(name):
820 820 """register a handler for a stream level parameter"""
821 821 def decorator(func):
822 822 assert name not in formatmap
823 823 b2streamparamsmap[name] = func
824 824 return func
825 825 return decorator
826 826
827 827 @b2streamparamhandler('compression')
828 828 def processcompression(unbundler, param, value):
829 829 """read compression parameter and install payload decompression"""
830 830 if value not in util.compengines.supportedbundletypes:
831 831 raise error.BundleUnknownFeatureError(params=(param,),
832 832 values=(value,))
833 833 unbundler._compengine = util.compengines.forbundletype(value)
834 834 if value is not None:
835 835 unbundler._compressed = True
836 836
837 837 class bundlepart(object):
838 838 """A bundle2 part contains application level payload
839 839
840 840 The part `type` is used to route the part to the application level
841 841 handler.
842 842
843 843 The part payload is contained in ``part.data``. It could be raw bytes or a
844 844 generator of byte chunks.
845 845
846 846 You can add parameters to the part using the ``addparam`` method.
847 847 Parameters can be either mandatory (default) or advisory. Remote side
848 848 should be able to safely ignore the advisory ones.
849 849
850 850 Both data and parameters cannot be modified after the generation has begun.
851 851 """
852 852
853 853 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
854 854 data='', mandatory=True):
855 855 validateparttype(parttype)
856 856 self.id = None
857 857 self.type = parttype
858 858 self._data = data
859 859 self._mandatoryparams = list(mandatoryparams)
860 860 self._advisoryparams = list(advisoryparams)
861 861 # checking for duplicated entries
862 862 self._seenparams = set()
863 863 for pname, __ in self._mandatoryparams + self._advisoryparams:
864 864 if pname in self._seenparams:
865 865 raise error.ProgrammingError('duplicated params: %s' % pname)
866 866 self._seenparams.add(pname)
867 867 # status of the part's generation:
868 868 # - None: not started,
869 869 # - False: currently generated,
870 870 # - True: generation done.
871 871 self._generated = None
872 872 self.mandatory = mandatory
873 873
874 874 def __repr__(self):
875 875 cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
876 876 return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
877 877 % (cls, id(self), self.id, self.type, self.mandatory))
878 878
879 879 def copy(self):
880 880 """return a copy of the part
881 881
882 882 The new part have the very same content but no partid assigned yet.
883 883 Parts with generated data cannot be copied."""
884 884 assert not util.safehasattr(self.data, 'next')
885 885 return self.__class__(self.type, self._mandatoryparams,
886 886 self._advisoryparams, self._data, self.mandatory)
887 887
888 888 # methods used to defines the part content
889 889 @property
890 890 def data(self):
891 891 return self._data
892 892
893 893 @data.setter
894 894 def data(self, data):
895 895 if self._generated is not None:
896 896 raise error.ReadOnlyPartError('part is being generated')
897 897 self._data = data
898 898
899 899 @property
900 900 def mandatoryparams(self):
901 901 # make it an immutable tuple to force people through ``addparam``
902 902 return tuple(self._mandatoryparams)
903 903
904 904 @property
905 905 def advisoryparams(self):
906 906 # make it an immutable tuple to force people through ``addparam``
907 907 return tuple(self._advisoryparams)
908 908
909 909 def addparam(self, name, value='', mandatory=True):
910 910 """add a parameter to the part
911 911
912 912 If 'mandatory' is set to True, the remote handler must claim support
913 913 for this parameter or the unbundling will be aborted.
914 914
915 915 The 'name' and 'value' cannot exceed 255 bytes each.
916 916 """
917 917 if self._generated is not None:
918 918 raise error.ReadOnlyPartError('part is being generated')
919 919 if name in self._seenparams:
920 920 raise ValueError('duplicated params: %s' % name)
921 921 self._seenparams.add(name)
922 922 params = self._advisoryparams
923 923 if mandatory:
924 924 params = self._mandatoryparams
925 925 params.append((name, value))
926 926
927 927 # methods used to generates the bundle2 stream
928 928 def getchunks(self, ui):
929 929 if self._generated is not None:
930 930 raise error.ProgrammingError('part can only be consumed once')
931 931 self._generated = False
932 932
933 933 if ui.debugflag:
934 934 msg = ['bundle2-output-part: "%s"' % self.type]
935 935 if not self.mandatory:
936 936 msg.append(' (advisory)')
937 937 nbmp = len(self.mandatoryparams)
938 938 nbap = len(self.advisoryparams)
939 939 if nbmp or nbap:
940 940 msg.append(' (params:')
941 941 if nbmp:
942 942 msg.append(' %i mandatory' % nbmp)
943 943 if nbap:
944 944 msg.append(' %i advisory' % nbmp)
945 945 msg.append(')')
946 946 if not self.data:
947 947 msg.append(' empty payload')
948 948 elif util.safehasattr(self.data, 'next'):
949 949 msg.append(' streamed payload')
950 950 else:
951 951 msg.append(' %i bytes payload' % len(self.data))
952 952 msg.append('\n')
953 953 ui.debug(''.join(msg))
954 954
955 955 #### header
956 956 if self.mandatory:
957 957 parttype = self.type.upper()
958 958 else:
959 959 parttype = self.type.lower()
960 960 outdebug(ui, 'part %s: "%s"' % (self.id, parttype))
961 961 ## parttype
962 962 header = [_pack(_fparttypesize, len(parttype)),
963 963 parttype, _pack(_fpartid, self.id),
964 964 ]
965 965 ## parameters
966 966 # count
967 967 manpar = self.mandatoryparams
968 968 advpar = self.advisoryparams
969 969 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
970 970 # size
971 971 parsizes = []
972 972 for key, value in manpar:
973 973 parsizes.append(len(key))
974 974 parsizes.append(len(value))
975 975 for key, value in advpar:
976 976 parsizes.append(len(key))
977 977 parsizes.append(len(value))
978 978 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
979 979 header.append(paramsizes)
980 980 # key, value
981 981 for key, value in manpar:
982 982 header.append(key)
983 983 header.append(value)
984 984 for key, value in advpar:
985 985 header.append(key)
986 986 header.append(value)
987 987 ## finalize header
988 988 headerchunk = ''.join(header)
989 989 outdebug(ui, 'header chunk size: %i' % len(headerchunk))
990 990 yield _pack(_fpartheadersize, len(headerchunk))
991 991 yield headerchunk
992 992 ## payload
993 993 try:
994 994 for chunk in self._payloadchunks():
995 995 outdebug(ui, 'payload chunk size: %i' % len(chunk))
996 996 yield _pack(_fpayloadsize, len(chunk))
997 997 yield chunk
998 998 except GeneratorExit:
999 999 # GeneratorExit means that nobody is listening for our
1000 1000 # results anyway, so just bail quickly rather than trying
1001 1001 # to produce an error part.
1002 1002 ui.debug('bundle2-generatorexit\n')
1003 1003 raise
1004 1004 except BaseException as exc:
1005 1005 # backup exception data for later
1006 1006 ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
1007 1007 % exc)
1008 exc_info = sys.exc_info()
1008 tb = sys.exc_info()[2]
1009 1009 msg = 'unexpected error: %s' % exc
1010 1010 interpart = bundlepart('error:abort', [('message', msg)],
1011 1011 mandatory=False)
1012 1012 interpart.id = 0
1013 1013 yield _pack(_fpayloadsize, -1)
1014 1014 for chunk in interpart.getchunks(ui=ui):
1015 1015 yield chunk
1016 1016 outdebug(ui, 'closing payload chunk')
1017 1017 # abort current part payload
1018 1018 yield _pack(_fpayloadsize, 0)
1019 if pycompat.ispy3:
1020 raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
1021 else:
1022 exec("""raise exc_info[0], exc_info[1], exc_info[2]""")
1019 pycompat.raisewithtb(exc, tb)
1023 1020 # end of payload
1024 1021 outdebug(ui, 'closing payload chunk')
1025 1022 yield _pack(_fpayloadsize, 0)
1026 1023 self._generated = True
1027 1024
1028 1025 def _payloadchunks(self):
1029 1026 """yield chunks of a the part payload
1030 1027
1031 1028 Exists to handle the different methods to provide data to a part."""
1032 1029 # we only support fixed size data now.
1033 1030 # This will be improved in the future.
1034 1031 if util.safehasattr(self.data, 'next'):
1035 1032 buff = util.chunkbuffer(self.data)
1036 1033 chunk = buff.read(preferedchunksize)
1037 1034 while chunk:
1038 1035 yield chunk
1039 1036 chunk = buff.read(preferedchunksize)
1040 1037 elif len(self.data):
1041 1038 yield self.data
1042 1039
1043 1040
1044 1041 flaginterrupt = -1
1045 1042
1046 1043 class interrupthandler(unpackermixin):
1047 1044 """read one part and process it with restricted capability
1048 1045
1049 1046 This allows to transmit exception raised on the producer size during part
1050 1047 iteration while the consumer is reading a part.
1051 1048
1052 1049 Part processed in this manner only have access to a ui object,"""
1053 1050
1054 1051 def __init__(self, ui, fp):
1055 1052 super(interrupthandler, self).__init__(fp)
1056 1053 self.ui = ui
1057 1054
1058 1055 def _readpartheader(self):
1059 1056 """reads a part header size and return the bytes blob
1060 1057
1061 1058 returns None if empty"""
1062 1059 headersize = self._unpack(_fpartheadersize)[0]
1063 1060 if headersize < 0:
1064 1061 raise error.BundleValueError('negative part header size: %i'
1065 1062 % headersize)
1066 1063 indebug(self.ui, 'part header size: %i\n' % headersize)
1067 1064 if headersize:
1068 1065 return self._readexact(headersize)
1069 1066 return None
1070 1067
1071 1068 def __call__(self):
1072 1069
1073 1070 self.ui.debug('bundle2-input-stream-interrupt:'
1074 1071 ' opening out of band context\n')
1075 1072 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1076 1073 headerblock = self._readpartheader()
1077 1074 if headerblock is None:
1078 1075 indebug(self.ui, 'no part found during interruption.')
1079 1076 return
1080 1077 part = unbundlepart(self.ui, headerblock, self._fp)
1081 1078 op = interruptoperation(self.ui)
1082 1079 _processpart(op, part)
1083 1080 self.ui.debug('bundle2-input-stream-interrupt:'
1084 1081 ' closing out of band context\n')
1085 1082
1086 1083 class interruptoperation(object):
1087 1084 """A limited operation to be use by part handler during interruption
1088 1085
1089 1086 It only have access to an ui object.
1090 1087 """
1091 1088
1092 1089 def __init__(self, ui):
1093 1090 self.ui = ui
1094 1091 self.reply = None
1095 1092 self.captureoutput = False
1096 1093
1097 1094 @property
1098 1095 def repo(self):
1099 1096 raise error.ProgrammingError('no repo access from stream interruption')
1100 1097
1101 1098 def gettransaction(self):
1102 1099 raise TransactionUnavailable('no repo access from stream interruption')
1103 1100
1104 1101 class unbundlepart(unpackermixin):
1105 1102 """a bundle part read from a bundle"""
1106 1103
1107 1104 def __init__(self, ui, header, fp):
1108 1105 super(unbundlepart, self).__init__(fp)
1109 1106 self._seekable = (util.safehasattr(fp, 'seek') and
1110 1107 util.safehasattr(fp, 'tell'))
1111 1108 self.ui = ui
1112 1109 # unbundle state attr
1113 1110 self._headerdata = header
1114 1111 self._headeroffset = 0
1115 1112 self._initialized = False
1116 1113 self.consumed = False
1117 1114 # part data
1118 1115 self.id = None
1119 1116 self.type = None
1120 1117 self.mandatoryparams = None
1121 1118 self.advisoryparams = None
1122 1119 self.params = None
1123 1120 self.mandatorykeys = ()
1124 1121 self._payloadstream = None
1125 1122 self._readheader()
1126 1123 self._mandatory = None
1127 1124 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1128 1125 self._pos = 0
1129 1126
1130 1127 def _fromheader(self, size):
1131 1128 """return the next <size> byte from the header"""
1132 1129 offset = self._headeroffset
1133 1130 data = self._headerdata[offset:(offset + size)]
1134 1131 self._headeroffset = offset + size
1135 1132 return data
1136 1133
1137 1134 def _unpackheader(self, format):
1138 1135 """read given format from header
1139 1136
1140 1137 This automatically compute the size of the format to read."""
1141 1138 data = self._fromheader(struct.calcsize(format))
1142 1139 return _unpack(format, data)
1143 1140
1144 1141 def _initparams(self, mandatoryparams, advisoryparams):
1145 1142 """internal function to setup all logic related parameters"""
1146 1143 # make it read only to prevent people touching it by mistake.
1147 1144 self.mandatoryparams = tuple(mandatoryparams)
1148 1145 self.advisoryparams = tuple(advisoryparams)
1149 1146 # user friendly UI
1150 1147 self.params = util.sortdict(self.mandatoryparams)
1151 1148 self.params.update(self.advisoryparams)
1152 1149 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1153 1150
1154 1151 def _payloadchunks(self, chunknum=0):
1155 1152 '''seek to specified chunk and start yielding data'''
1156 1153 if len(self._chunkindex) == 0:
1157 1154 assert chunknum == 0, 'Must start with chunk 0'
1158 1155 self._chunkindex.append((0, self._tellfp()))
1159 1156 else:
1160 1157 assert chunknum < len(self._chunkindex), \
1161 1158 'Unknown chunk %d' % chunknum
1162 1159 self._seekfp(self._chunkindex[chunknum][1])
1163 1160
1164 1161 pos = self._chunkindex[chunknum][0]
1165 1162 payloadsize = self._unpack(_fpayloadsize)[0]
1166 1163 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1167 1164 while payloadsize:
1168 1165 if payloadsize == flaginterrupt:
1169 1166 # interruption detection, the handler will now read a
1170 1167 # single part and process it.
1171 1168 interrupthandler(self.ui, self._fp)()
1172 1169 elif payloadsize < 0:
1173 1170 msg = 'negative payload chunk size: %i' % payloadsize
1174 1171 raise error.BundleValueError(msg)
1175 1172 else:
1176 1173 result = self._readexact(payloadsize)
1177 1174 chunknum += 1
1178 1175 pos += payloadsize
1179 1176 if chunknum == len(self._chunkindex):
1180 1177 self._chunkindex.append((pos, self._tellfp()))
1181 1178 yield result
1182 1179 payloadsize = self._unpack(_fpayloadsize)[0]
1183 1180 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1184 1181
1185 1182 def _findchunk(self, pos):
1186 1183 '''for a given payload position, return a chunk number and offset'''
1187 1184 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1188 1185 if ppos == pos:
1189 1186 return chunk, 0
1190 1187 elif ppos > pos:
1191 1188 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1192 1189 raise ValueError('Unknown chunk')
1193 1190
1194 1191 def _readheader(self):
1195 1192 """read the header and setup the object"""
1196 1193 typesize = self._unpackheader(_fparttypesize)[0]
1197 1194 self.type = self._fromheader(typesize)
1198 1195 indebug(self.ui, 'part type: "%s"' % self.type)
1199 1196 self.id = self._unpackheader(_fpartid)[0]
1200 1197 indebug(self.ui, 'part id: "%s"' % self.id)
1201 1198 # extract mandatory bit from type
1202 1199 self.mandatory = (self.type != self.type.lower())
1203 1200 self.type = self.type.lower()
1204 1201 ## reading parameters
1205 1202 # param count
1206 1203 mancount, advcount = self._unpackheader(_fpartparamcount)
1207 1204 indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
1208 1205 # param size
1209 1206 fparamsizes = _makefpartparamsizes(mancount + advcount)
1210 1207 paramsizes = self._unpackheader(fparamsizes)
1211 1208 # make it a list of couple again
1212 1209 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
1213 1210 # split mandatory from advisory
1214 1211 mansizes = paramsizes[:mancount]
1215 1212 advsizes = paramsizes[mancount:]
1216 1213 # retrieve param value
1217 1214 manparams = []
1218 1215 for key, value in mansizes:
1219 1216 manparams.append((self._fromheader(key), self._fromheader(value)))
1220 1217 advparams = []
1221 1218 for key, value in advsizes:
1222 1219 advparams.append((self._fromheader(key), self._fromheader(value)))
1223 1220 self._initparams(manparams, advparams)
1224 1221 ## part payload
1225 1222 self._payloadstream = util.chunkbuffer(self._payloadchunks())
1226 1223 # we read the data, tell it
1227 1224 self._initialized = True
1228 1225
1229 1226 def read(self, size=None):
1230 1227 """read payload data"""
1231 1228 if not self._initialized:
1232 1229 self._readheader()
1233 1230 if size is None:
1234 1231 data = self._payloadstream.read()
1235 1232 else:
1236 1233 data = self._payloadstream.read(size)
1237 1234 self._pos += len(data)
1238 1235 if size is None or len(data) < size:
1239 1236 if not self.consumed and self._pos:
1240 1237 self.ui.debug('bundle2-input-part: total payload size %i\n'
1241 1238 % self._pos)
1242 1239 self.consumed = True
1243 1240 return data
1244 1241
1245 1242 def tell(self):
1246 1243 return self._pos
1247 1244
1248 1245 def seek(self, offset, whence=0):
1249 1246 if whence == 0:
1250 1247 newpos = offset
1251 1248 elif whence == 1:
1252 1249 newpos = self._pos + offset
1253 1250 elif whence == 2:
1254 1251 if not self.consumed:
1255 1252 self.read()
1256 1253 newpos = self._chunkindex[-1][0] - offset
1257 1254 else:
1258 1255 raise ValueError('Unknown whence value: %r' % (whence,))
1259 1256
1260 1257 if newpos > self._chunkindex[-1][0] and not self.consumed:
1261 1258 self.read()
1262 1259 if not 0 <= newpos <= self._chunkindex[-1][0]:
1263 1260 raise ValueError('Offset out of range')
1264 1261
1265 1262 if self._pos != newpos:
1266 1263 chunk, internaloffset = self._findchunk(newpos)
1267 1264 self._payloadstream = util.chunkbuffer(self._payloadchunks(chunk))
1268 1265 adjust = self.read(internaloffset)
1269 1266 if len(adjust) != internaloffset:
1270 1267 raise error.Abort(_('Seek failed\n'))
1271 1268 self._pos = newpos
1272 1269
1273 1270 def _seekfp(self, offset, whence=0):
1274 1271 """move the underlying file pointer
1275 1272
1276 1273 This method is meant for internal usage by the bundle2 protocol only.
1277 1274 They directly manipulate the low level stream including bundle2 level
1278 1275 instruction.
1279 1276
1280 1277 Do not use it to implement higher-level logic or methods."""
1281 1278 if self._seekable:
1282 1279 return self._fp.seek(offset, whence)
1283 1280 else:
1284 1281 raise NotImplementedError(_('File pointer is not seekable'))
1285 1282
1286 1283 def _tellfp(self):
1287 1284 """return the file offset, or None if file is not seekable
1288 1285
1289 1286 This method is meant for internal usage by the bundle2 protocol only.
1290 1287 They directly manipulate the low level stream including bundle2 level
1291 1288 instruction.
1292 1289
1293 1290 Do not use it to implement higher-level logic or methods."""
1294 1291 if self._seekable:
1295 1292 try:
1296 1293 return self._fp.tell()
1297 1294 except IOError as e:
1298 1295 if e.errno == errno.ESPIPE:
1299 1296 self._seekable = False
1300 1297 else:
1301 1298 raise
1302 1299 return None
1303 1300
1304 1301 # These are only the static capabilities.
1305 1302 # Check the 'getrepocaps' function for the rest.
1306 1303 capabilities = {'HG20': (),
1307 1304 'error': ('abort', 'unsupportedcontent', 'pushraced',
1308 1305 'pushkey'),
1309 1306 'listkeys': (),
1310 1307 'pushkey': (),
1311 1308 'digests': tuple(sorted(util.DIGESTS.keys())),
1312 1309 'remote-changegroup': ('http', 'https'),
1313 1310 'hgtagsfnodes': (),
1314 1311 }
1315 1312
1316 1313 def getrepocaps(repo, allowpushback=False):
1317 1314 """return the bundle2 capabilities for a given repo
1318 1315
1319 1316 Exists to allow extensions (like evolution) to mutate the capabilities.
1320 1317 """
1321 1318 caps = capabilities.copy()
1322 1319 caps['changegroup'] = tuple(sorted(
1323 1320 changegroup.supportedincomingversions(repo)))
1324 1321 if obsolete.isenabled(repo, obsolete.exchangeopt):
1325 1322 supportedformat = tuple('V%i' % v for v in obsolete.formats)
1326 1323 caps['obsmarkers'] = supportedformat
1327 1324 if allowpushback:
1328 1325 caps['pushback'] = ()
1329 1326 return caps
1330 1327
1331 1328 def bundle2caps(remote):
1332 1329 """return the bundle capabilities of a peer as dict"""
1333 1330 raw = remote.capable('bundle2')
1334 1331 if not raw and raw != '':
1335 1332 return {}
1336 1333 capsblob = urlreq.unquote(remote.capable('bundle2'))
1337 1334 return decodecaps(capsblob)
1338 1335
1339 1336 def obsmarkersversion(caps):
1340 1337 """extract the list of supported obsmarkers versions from a bundle2caps dict
1341 1338 """
1342 1339 obscaps = caps.get('obsmarkers', ())
1343 1340 return [int(c[1:]) for c in obscaps if c.startswith('V')]
1344 1341
1345 1342 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
1346 1343 compopts=None):
1347 1344 """Write a bundle file and return its filename.
1348 1345
1349 1346 Existing files will not be overwritten.
1350 1347 If no filename is specified, a temporary file is created.
1351 1348 bz2 compression can be turned off.
1352 1349 The bundle file will be deleted in case of errors.
1353 1350 """
1354 1351
1355 1352 if bundletype == "HG20":
1356 1353 bundle = bundle20(ui)
1357 1354 bundle.setcompression(compression, compopts)
1358 1355 part = bundle.newpart('changegroup', data=cg.getchunks())
1359 1356 part.addparam('version', cg.version)
1360 1357 if 'clcount' in cg.extras:
1361 1358 part.addparam('nbchanges', str(cg.extras['clcount']),
1362 1359 mandatory=False)
1363 1360 chunkiter = bundle.getchunks()
1364 1361 else:
1365 1362 # compression argument is only for the bundle2 case
1366 1363 assert compression is None
1367 1364 if cg.version != '01':
1368 1365 raise error.Abort(_('old bundle types only supports v1 '
1369 1366 'changegroups'))
1370 1367 header, comp = bundletypes[bundletype]
1371 1368 if comp not in util.compengines.supportedbundletypes:
1372 1369 raise error.Abort(_('unknown stream compression type: %s')
1373 1370 % comp)
1374 1371 compengine = util.compengines.forbundletype(comp)
1375 1372 def chunkiter():
1376 1373 yield header
1377 1374 for chunk in compengine.compressstream(cg.getchunks(), compopts):
1378 1375 yield chunk
1379 1376 chunkiter = chunkiter()
1380 1377
1381 1378 # parse the changegroup data, otherwise we will block
1382 1379 # in case of sshrepo because we don't know the end of the stream
1383 1380 return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
1384 1381
1385 1382 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest'))
1386 1383 def handlechangegroup(op, inpart):
1387 1384 """apply a changegroup part on the repo
1388 1385
1389 1386 This is a very early implementation that will massive rework before being
1390 1387 inflicted to any end-user.
1391 1388 """
1392 1389 # Make sure we trigger a transaction creation
1393 1390 #
1394 1391 # The addchangegroup function will get a transaction object by itself, but
1395 1392 # we need to make sure we trigger the creation of a transaction object used
1396 1393 # for the whole processing scope.
1397 1394 op.gettransaction()
1398 1395 unpackerversion = inpart.params.get('version', '01')
1399 1396 # We should raise an appropriate exception here
1400 1397 cg = changegroup.getunbundler(unpackerversion, inpart, None)
1401 1398 # the source and url passed here are overwritten by the one contained in
1402 1399 # the transaction.hookargs argument. So 'bundle2' is a placeholder
1403 1400 nbchangesets = None
1404 1401 if 'nbchanges' in inpart.params:
1405 1402 nbchangesets = int(inpart.params.get('nbchanges'))
1406 1403 if ('treemanifest' in inpart.params and
1407 1404 'treemanifest' not in op.repo.requirements):
1408 1405 if len(op.repo.changelog) != 0:
1409 1406 raise error.Abort(_(
1410 1407 "bundle contains tree manifests, but local repo is "
1411 1408 "non-empty and does not use tree manifests"))
1412 1409 op.repo.requirements.add('treemanifest')
1413 1410 op.repo._applyopenerreqs()
1414 1411 op.repo._writerequirements()
1415 1412 ret = cg.apply(op.repo, 'bundle2', 'bundle2', expectedtotal=nbchangesets)
1416 1413 op.records.add('changegroup', {'return': ret})
1417 1414 if op.reply is not None:
1418 1415 # This is definitely not the final form of this
1419 1416 # return. But one need to start somewhere.
1420 1417 part = op.reply.newpart('reply:changegroup', mandatory=False)
1421 1418 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1422 1419 part.addparam('return', '%i' % ret, mandatory=False)
1423 1420 assert not inpart.read()
1424 1421
1425 1422 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
1426 1423 ['digest:%s' % k for k in util.DIGESTS.keys()])
1427 1424 @parthandler('remote-changegroup', _remotechangegroupparams)
1428 1425 def handleremotechangegroup(op, inpart):
1429 1426 """apply a bundle10 on the repo, given an url and validation information
1430 1427
1431 1428 All the information about the remote bundle to import are given as
1432 1429 parameters. The parameters include:
1433 1430 - url: the url to the bundle10.
1434 1431 - size: the bundle10 file size. It is used to validate what was
1435 1432 retrieved by the client matches the server knowledge about the bundle.
1436 1433 - digests: a space separated list of the digest types provided as
1437 1434 parameters.
1438 1435 - digest:<digest-type>: the hexadecimal representation of the digest with
1439 1436 that name. Like the size, it is used to validate what was retrieved by
1440 1437 the client matches what the server knows about the bundle.
1441 1438
1442 1439 When multiple digest types are given, all of them are checked.
1443 1440 """
1444 1441 try:
1445 1442 raw_url = inpart.params['url']
1446 1443 except KeyError:
1447 1444 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
1448 1445 parsed_url = util.url(raw_url)
1449 1446 if parsed_url.scheme not in capabilities['remote-changegroup']:
1450 1447 raise error.Abort(_('remote-changegroup does not support %s urls') %
1451 1448 parsed_url.scheme)
1452 1449
1453 1450 try:
1454 1451 size = int(inpart.params['size'])
1455 1452 except ValueError:
1456 1453 raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
1457 1454 % 'size')
1458 1455 except KeyError:
1459 1456 raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
1460 1457
1461 1458 digests = {}
1462 1459 for typ in inpart.params.get('digests', '').split():
1463 1460 param = 'digest:%s' % typ
1464 1461 try:
1465 1462 value = inpart.params[param]
1466 1463 except KeyError:
1467 1464 raise error.Abort(_('remote-changegroup: missing "%s" param') %
1468 1465 param)
1469 1466 digests[typ] = value
1470 1467
1471 1468 real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
1472 1469
1473 1470 # Make sure we trigger a transaction creation
1474 1471 #
1475 1472 # The addchangegroup function will get a transaction object by itself, but
1476 1473 # we need to make sure we trigger the creation of a transaction object used
1477 1474 # for the whole processing scope.
1478 1475 op.gettransaction()
1479 1476 from . import exchange
1480 1477 cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
1481 1478 if not isinstance(cg, changegroup.cg1unpacker):
1482 1479 raise error.Abort(_('%s: not a bundle version 1.0') %
1483 1480 util.hidepassword(raw_url))
1484 1481 ret = cg.apply(op.repo, 'bundle2', 'bundle2')
1485 1482 op.records.add('changegroup', {'return': ret})
1486 1483 if op.reply is not None:
1487 1484 # This is definitely not the final form of this
1488 1485 # return. But one need to start somewhere.
1489 1486 part = op.reply.newpart('reply:changegroup')
1490 1487 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
1491 1488 part.addparam('return', '%i' % ret, mandatory=False)
1492 1489 try:
1493 1490 real_part.validate()
1494 1491 except error.Abort as e:
1495 1492 raise error.Abort(_('bundle at %s is corrupted:\n%s') %
1496 1493 (util.hidepassword(raw_url), str(e)))
1497 1494 assert not inpart.read()
1498 1495
1499 1496 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
1500 1497 def handlereplychangegroup(op, inpart):
1501 1498 ret = int(inpart.params['return'])
1502 1499 replyto = int(inpart.params['in-reply-to'])
1503 1500 op.records.add('changegroup', {'return': ret}, replyto)
1504 1501
1505 1502 @parthandler('check:heads')
1506 1503 def handlecheckheads(op, inpart):
1507 1504 """check that head of the repo did not change
1508 1505
1509 1506 This is used to detect a push race when using unbundle.
1510 1507 This replaces the "heads" argument of unbundle."""
1511 1508 h = inpart.read(20)
1512 1509 heads = []
1513 1510 while len(h) == 20:
1514 1511 heads.append(h)
1515 1512 h = inpart.read(20)
1516 1513 assert not h
1517 1514 # Trigger a transaction so that we are guaranteed to have the lock now.
1518 1515 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1519 1516 op.gettransaction()
1520 1517 if sorted(heads) != sorted(op.repo.heads()):
1521 1518 raise error.PushRaced('repository changed while pushing - '
1522 1519 'please try again')
1523 1520
1524 1521 @parthandler('output')
1525 1522 def handleoutput(op, inpart):
1526 1523 """forward output captured on the server to the client"""
1527 1524 for line in inpart.read().splitlines():
1528 1525 op.ui.status(_('remote: %s\n') % line)
1529 1526
1530 1527 @parthandler('replycaps')
1531 1528 def handlereplycaps(op, inpart):
1532 1529 """Notify that a reply bundle should be created
1533 1530
1534 1531 The payload contains the capabilities information for the reply"""
1535 1532 caps = decodecaps(inpart.read())
1536 1533 if op.reply is None:
1537 1534 op.reply = bundle20(op.ui, caps)
1538 1535
1539 1536 class AbortFromPart(error.Abort):
1540 1537 """Sub-class of Abort that denotes an error from a bundle2 part."""
1541 1538
1542 1539 @parthandler('error:abort', ('message', 'hint'))
1543 1540 def handleerrorabort(op, inpart):
1544 1541 """Used to transmit abort error over the wire"""
1545 1542 raise AbortFromPart(inpart.params['message'],
1546 1543 hint=inpart.params.get('hint'))
1547 1544
1548 1545 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
1549 1546 'in-reply-to'))
1550 1547 def handleerrorpushkey(op, inpart):
1551 1548 """Used to transmit failure of a mandatory pushkey over the wire"""
1552 1549 kwargs = {}
1553 1550 for name in ('namespace', 'key', 'new', 'old', 'ret'):
1554 1551 value = inpart.params.get(name)
1555 1552 if value is not None:
1556 1553 kwargs[name] = value
1557 1554 raise error.PushkeyFailed(inpart.params['in-reply-to'], **kwargs)
1558 1555
1559 1556 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
1560 1557 def handleerrorunsupportedcontent(op, inpart):
1561 1558 """Used to transmit unknown content error over the wire"""
1562 1559 kwargs = {}
1563 1560 parttype = inpart.params.get('parttype')
1564 1561 if parttype is not None:
1565 1562 kwargs['parttype'] = parttype
1566 1563 params = inpart.params.get('params')
1567 1564 if params is not None:
1568 1565 kwargs['params'] = params.split('\0')
1569 1566
1570 1567 raise error.BundleUnknownFeatureError(**kwargs)
1571 1568
1572 1569 @parthandler('error:pushraced', ('message',))
1573 1570 def handleerrorpushraced(op, inpart):
1574 1571 """Used to transmit push race error over the wire"""
1575 1572 raise error.ResponseError(_('push failed:'), inpart.params['message'])
1576 1573
1577 1574 @parthandler('listkeys', ('namespace',))
1578 1575 def handlelistkeys(op, inpart):
1579 1576 """retrieve pushkey namespace content stored in a bundle2"""
1580 1577 namespace = inpart.params['namespace']
1581 1578 r = pushkey.decodekeys(inpart.read())
1582 1579 op.records.add('listkeys', (namespace, r))
1583 1580
1584 1581 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
1585 1582 def handlepushkey(op, inpart):
1586 1583 """process a pushkey request"""
1587 1584 dec = pushkey.decode
1588 1585 namespace = dec(inpart.params['namespace'])
1589 1586 key = dec(inpart.params['key'])
1590 1587 old = dec(inpart.params['old'])
1591 1588 new = dec(inpart.params['new'])
1592 1589 # Grab the transaction to ensure that we have the lock before performing the
1593 1590 # pushkey.
1594 1591 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1595 1592 op.gettransaction()
1596 1593 ret = op.repo.pushkey(namespace, key, old, new)
1597 1594 record = {'namespace': namespace,
1598 1595 'key': key,
1599 1596 'old': old,
1600 1597 'new': new}
1601 1598 op.records.add('pushkey', record)
1602 1599 if op.reply is not None:
1603 1600 rpart = op.reply.newpart('reply:pushkey')
1604 1601 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1605 1602 rpart.addparam('return', '%i' % ret, mandatory=False)
1606 1603 if inpart.mandatory and not ret:
1607 1604 kwargs = {}
1608 1605 for key in ('namespace', 'key', 'new', 'old', 'ret'):
1609 1606 if key in inpart.params:
1610 1607 kwargs[key] = inpart.params[key]
1611 1608 raise error.PushkeyFailed(partid=str(inpart.id), **kwargs)
1612 1609
1613 1610 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
1614 1611 def handlepushkeyreply(op, inpart):
1615 1612 """retrieve the result of a pushkey request"""
1616 1613 ret = int(inpart.params['return'])
1617 1614 partid = int(inpart.params['in-reply-to'])
1618 1615 op.records.add('pushkey', {'return': ret}, partid)
1619 1616
1620 1617 @parthandler('obsmarkers')
1621 1618 def handleobsmarker(op, inpart):
1622 1619 """add a stream of obsmarkers to the repo"""
1623 1620 tr = op.gettransaction()
1624 1621 markerdata = inpart.read()
1625 1622 if op.ui.config('experimental', 'obsmarkers-exchange-debug', False):
1626 1623 op.ui.write(('obsmarker-exchange: %i bytes received\n')
1627 1624 % len(markerdata))
1628 1625 # The mergemarkers call will crash if marker creation is not enabled.
1629 1626 # we want to avoid this if the part is advisory.
1630 1627 if not inpart.mandatory and op.repo.obsstore.readonly:
1631 1628 op.repo.ui.debug('ignoring obsolescence markers, feature not enabled')
1632 1629 return
1633 1630 new = op.repo.obsstore.mergemarkers(tr, markerdata)
1634 1631 if new:
1635 1632 op.repo.ui.status(_('%i new obsolescence markers\n') % new)
1636 1633 op.records.add('obsmarkers', {'new': new})
1637 1634 if op.reply is not None:
1638 1635 rpart = op.reply.newpart('reply:obsmarkers')
1639 1636 rpart.addparam('in-reply-to', str(inpart.id), mandatory=False)
1640 1637 rpart.addparam('new', '%i' % new, mandatory=False)
1641 1638
1642 1639
1643 1640 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
1644 1641 def handleobsmarkerreply(op, inpart):
1645 1642 """retrieve the result of a pushkey request"""
1646 1643 ret = int(inpart.params['new'])
1647 1644 partid = int(inpart.params['in-reply-to'])
1648 1645 op.records.add('obsmarkers', {'new': ret}, partid)
1649 1646
1650 1647 @parthandler('hgtagsfnodes')
1651 1648 def handlehgtagsfnodes(op, inpart):
1652 1649 """Applies .hgtags fnodes cache entries to the local repo.
1653 1650
1654 1651 Payload is pairs of 20 byte changeset nodes and filenodes.
1655 1652 """
1656 1653 # Grab the transaction so we ensure that we have the lock at this point.
1657 1654 if op.ui.configbool('experimental', 'bundle2lazylocking'):
1658 1655 op.gettransaction()
1659 1656 cache = tags.hgtagsfnodescache(op.repo.unfiltered())
1660 1657
1661 1658 count = 0
1662 1659 while True:
1663 1660 node = inpart.read(20)
1664 1661 fnode = inpart.read(20)
1665 1662 if len(node) < 20 or len(fnode) < 20:
1666 1663 op.ui.debug('ignoring incomplete received .hgtags fnodes data\n')
1667 1664 break
1668 1665 cache.setfnode(node, fnode)
1669 1666 count += 1
1670 1667
1671 1668 cache.write()
1672 1669 op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
@@ -1,412 +1,420 b''
1 1 # pycompat.py - portability shim for python 3
2 2 #
3 3 # This software may be used and distributed according to the terms of the
4 4 # GNU General Public License version 2 or any later version.
5 5
6 6 """Mercurial portability shim for python 3.
7 7
8 8 This contains aliases to hide python version-specific details from the core.
9 9 """
10 10
11 11 from __future__ import absolute_import
12 12
13 13 import getopt
14 14 import os
15 15 import shlex
16 16 import sys
17 17
18 18 ispy3 = (sys.version_info[0] >= 3)
19 19
20 20 if not ispy3:
21 21 import cookielib
22 22 import cPickle as pickle
23 23 import httplib
24 24 import Queue as _queue
25 25 import SocketServer as socketserver
26 26 import xmlrpclib
27 27 else:
28 28 import http.cookiejar as cookielib
29 29 import http.client as httplib
30 30 import pickle
31 31 import queue as _queue
32 32 import socketserver
33 33 import xmlrpc.client as xmlrpclib
34 34
35 35 def identity(a):
36 36 return a
37 37
38 38 if ispy3:
39 39 import builtins
40 40 import functools
41 41 import io
42 42 import struct
43 43
44 44 fsencode = os.fsencode
45 45 fsdecode = os.fsdecode
46 46 # A bytes version of os.name.
47 47 oslinesep = os.linesep.encode('ascii')
48 48 osname = os.name.encode('ascii')
49 49 ospathsep = os.pathsep.encode('ascii')
50 50 ossep = os.sep.encode('ascii')
51 51 osaltsep = os.altsep
52 52 if osaltsep:
53 53 osaltsep = osaltsep.encode('ascii')
54 54 # os.getcwd() on Python 3 returns string, but it has os.getcwdb() which
55 55 # returns bytes.
56 56 getcwd = os.getcwdb
57 57 sysplatform = sys.platform.encode('ascii')
58 58 sysexecutable = sys.executable
59 59 if sysexecutable:
60 60 sysexecutable = os.fsencode(sysexecutable)
61 61 stringio = io.BytesIO
62 62 maplist = lambda *args: list(map(*args))
63 63
64 64 # TODO: .buffer might not exist if std streams were replaced; we'll need
65 65 # a silly wrapper to make a bytes stream backed by a unicode one.
66 66 stdin = sys.stdin.buffer
67 67 stdout = sys.stdout.buffer
68 68 stderr = sys.stderr.buffer
69 69
70 70 # Since Python 3 converts argv to wchar_t type by Py_DecodeLocale() on Unix,
71 71 # we can use os.fsencode() to get back bytes argv.
72 72 #
73 73 # https://hg.python.org/cpython/file/v3.5.1/Programs/python.c#l55
74 74 #
75 75 # TODO: On Windows, the native argv is wchar_t, so we'll need a different
76 76 # workaround to simulate the Python 2 (i.e. ANSI Win32 API) behavior.
77 77 if getattr(sys, 'argv', None) is not None:
78 78 sysargv = list(map(os.fsencode, sys.argv))
79 79
80 80 bytechr = struct.Struct('>B').pack
81 81
82 82 class bytestr(bytes):
83 83 """A bytes which mostly acts as a Python 2 str
84 84
85 85 >>> bytestr(), bytestr(bytearray(b'foo')), bytestr(u'ascii'), bytestr(1)
86 86 (b'', b'foo', b'ascii', b'1')
87 87 >>> s = bytestr(b'foo')
88 88 >>> assert s is bytestr(s)
89 89
90 90 There's no implicit conversion from non-ascii str as its encoding is
91 91 unknown:
92 92
93 93 >>> bytestr(chr(0x80)) # doctest: +ELLIPSIS
94 94 Traceback (most recent call last):
95 95 ...
96 96 UnicodeEncodeError: ...
97 97
98 98 Comparison between bytestr and bytes should work:
99 99
100 100 >>> assert bytestr(b'foo') == b'foo'
101 101 >>> assert b'foo' == bytestr(b'foo')
102 102 >>> assert b'f' in bytestr(b'foo')
103 103 >>> assert bytestr(b'f') in b'foo'
104 104
105 105 Sliced elements should be bytes, not integer:
106 106
107 107 >>> s[1], s[:2]
108 108 (b'o', b'fo')
109 109 >>> list(s), list(reversed(s))
110 110 ([b'f', b'o', b'o'], [b'o', b'o', b'f'])
111 111
112 112 As bytestr type isn't propagated across operations, you need to cast
113 113 bytes to bytestr explicitly:
114 114
115 115 >>> s = bytestr(b'foo').upper()
116 116 >>> t = bytestr(s)
117 117 >>> s[0], t[0]
118 118 (70, b'F')
119 119
120 120 Be careful to not pass a bytestr object to a function which expects
121 121 bytearray-like behavior.
122 122
123 123 >>> t = bytes(t) # cast to bytes
124 124 >>> assert type(t) is bytes
125 125 """
126 126
127 127 def __new__(cls, s=b''):
128 128 if isinstance(s, bytestr):
129 129 return s
130 130 if not isinstance(s, (bytes, bytearray)):
131 131 s = str(s).encode(u'ascii')
132 132 return bytes.__new__(cls, s)
133 133
134 134 def __getitem__(self, key):
135 135 s = bytes.__getitem__(self, key)
136 136 if not isinstance(s, bytes):
137 137 s = bytechr(s)
138 138 return s
139 139
140 140 def __iter__(self):
141 141 return iterbytestr(bytes.__iter__(self))
142 142
143 143 def iterbytestr(s):
144 144 """Iterate bytes as if it were a str object of Python 2"""
145 145 return map(bytechr, s)
146 146
147 147 def sysbytes(s):
148 148 """Convert an internal str (e.g. keyword, __doc__) back to bytes
149 149
150 150 This never raises UnicodeEncodeError, but only ASCII characters
151 151 can be round-trip by sysstr(sysbytes(s)).
152 152 """
153 153 return s.encode(u'utf-8')
154 154
155 155 def sysstr(s):
156 156 """Return a keyword str to be passed to Python functions such as
157 157 getattr() and str.encode()
158 158
159 159 This never raises UnicodeDecodeError. Non-ascii characters are
160 160 considered invalid and mapped to arbitrary but unique code points
161 161 such that 'sysstr(a) != sysstr(b)' for all 'a != b'.
162 162 """
163 163 if isinstance(s, builtins.str):
164 164 return s
165 165 return s.decode(u'latin-1')
166 166
167 def raisewithtb(exc, tb):
168 """Raise exception with the given traceback"""
169 raise exc.with_traceback(tb)
170
167 171 def _wrapattrfunc(f):
168 172 @functools.wraps(f)
169 173 def w(object, name, *args):
170 174 return f(object, sysstr(name), *args)
171 175 return w
172 176
173 177 # these wrappers are automagically imported by hgloader
174 178 delattr = _wrapattrfunc(builtins.delattr)
175 179 getattr = _wrapattrfunc(builtins.getattr)
176 180 hasattr = _wrapattrfunc(builtins.hasattr)
177 181 setattr = _wrapattrfunc(builtins.setattr)
178 182 xrange = builtins.range
179 183 unicode = str
180 184
181 185 def open(name, mode='r', buffering=-1):
182 186 return builtins.open(name, sysstr(mode), buffering)
183 187
184 188 # getopt.getopt() on Python 3 deals with unicodes internally so we cannot
185 189 # pass bytes there. Passing unicodes will result in unicodes as return
186 190 # values which we need to convert again to bytes.
187 191 def getoptb(args, shortlist, namelist):
188 192 args = [a.decode('latin-1') for a in args]
189 193 shortlist = shortlist.decode('latin-1')
190 194 namelist = [a.decode('latin-1') for a in namelist]
191 195 opts, args = getopt.getopt(args, shortlist, namelist)
192 196 opts = [(a[0].encode('latin-1'), a[1].encode('latin-1'))
193 197 for a in opts]
194 198 args = [a.encode('latin-1') for a in args]
195 199 return opts, args
196 200
197 201 # keys of keyword arguments in Python need to be strings which are unicodes
198 202 # Python 3. This function takes keyword arguments, convert the keys to str.
199 203 def strkwargs(dic):
200 204 dic = dict((k.decode('latin-1'), v) for k, v in dic.iteritems())
201 205 return dic
202 206
203 207 # keys of keyword arguments need to be unicode while passing into
204 208 # a function. This function helps us to convert those keys back to bytes
205 209 # again as we need to deal with bytes.
206 210 def byteskwargs(dic):
207 211 dic = dict((k.encode('latin-1'), v) for k, v in dic.iteritems())
208 212 return dic
209 213
210 214 # shlex.split() accepts unicodes on Python 3. This function takes bytes
211 215 # argument, convert it into unicodes, pass into shlex.split(), convert the
212 216 # returned value to bytes and return that.
213 217 # TODO: handle shlex.shlex().
214 218 def shlexsplit(s):
215 219 ret = shlex.split(s.decode('latin-1'))
216 220 return [a.encode('latin-1') for a in ret]
217 221
218 222 else:
219 223 import cStringIO
220 224
221 225 bytechr = chr
222 226 bytestr = str
223 227 iterbytestr = iter
224 228 sysbytes = identity
225 229 sysstr = identity
226 230
231 # this can't be parsed on Python 3
232 exec('def raisewithtb(exc, tb):\n'
233 ' raise exc, None, tb\n')
234
227 235 # Partial backport from os.py in Python 3, which only accepts bytes.
228 236 # In Python 2, our paths should only ever be bytes, a unicode path
229 237 # indicates a bug.
230 238 def fsencode(filename):
231 239 if isinstance(filename, str):
232 240 return filename
233 241 else:
234 242 raise TypeError(
235 243 "expect str, not %s" % type(filename).__name__)
236 244
237 245 # In Python 2, fsdecode() has a very chance to receive bytes. So it's
238 246 # better not to touch Python 2 part as it's already working fine.
239 247 fsdecode = identity
240 248
241 249 def getoptb(args, shortlist, namelist):
242 250 return getopt.getopt(args, shortlist, namelist)
243 251
244 252 strkwargs = identity
245 253 byteskwargs = identity
246 254
247 255 oslinesep = os.linesep
248 256 osname = os.name
249 257 ospathsep = os.pathsep
250 258 ossep = os.sep
251 259 osaltsep = os.altsep
252 260 stdin = sys.stdin
253 261 stdout = sys.stdout
254 262 stderr = sys.stderr
255 263 if getattr(sys, 'argv', None) is not None:
256 264 sysargv = sys.argv
257 265 sysplatform = sys.platform
258 266 getcwd = os.getcwd
259 267 sysexecutable = sys.executable
260 268 shlexsplit = shlex.split
261 269 stringio = cStringIO.StringIO
262 270 maplist = map
263 271
264 272 empty = _queue.Empty
265 273 queue = _queue.Queue
266 274
267 275 class _pycompatstub(object):
268 276 def __init__(self):
269 277 self._aliases = {}
270 278
271 279 def _registeraliases(self, origin, items):
272 280 """Add items that will be populated at the first access"""
273 281 items = map(sysstr, items)
274 282 self._aliases.update(
275 283 (item.replace(sysstr('_'), sysstr('')).lower(), (origin, item))
276 284 for item in items)
277 285
278 286 def _registeralias(self, origin, attr, name):
279 287 """Alias ``origin``.``attr`` as ``name``"""
280 288 self._aliases[sysstr(name)] = (origin, sysstr(attr))
281 289
282 290 def __getattr__(self, name):
283 291 try:
284 292 origin, item = self._aliases[name]
285 293 except KeyError:
286 294 raise AttributeError(name)
287 295 self.__dict__[name] = obj = getattr(origin, item)
288 296 return obj
289 297
290 298 httpserver = _pycompatstub()
291 299 urlreq = _pycompatstub()
292 300 urlerr = _pycompatstub()
293 301 if not ispy3:
294 302 import BaseHTTPServer
295 303 import CGIHTTPServer
296 304 import SimpleHTTPServer
297 305 import urllib2
298 306 import urllib
299 307 import urlparse
300 308 urlreq._registeraliases(urllib, (
301 309 "addclosehook",
302 310 "addinfourl",
303 311 "ftpwrapper",
304 312 "pathname2url",
305 313 "quote",
306 314 "splitattr",
307 315 "splitpasswd",
308 316 "splitport",
309 317 "splituser",
310 318 "unquote",
311 319 "url2pathname",
312 320 "urlencode",
313 321 ))
314 322 urlreq._registeraliases(urllib2, (
315 323 "AbstractHTTPHandler",
316 324 "BaseHandler",
317 325 "build_opener",
318 326 "FileHandler",
319 327 "FTPHandler",
320 328 "HTTPBasicAuthHandler",
321 329 "HTTPDigestAuthHandler",
322 330 "HTTPHandler",
323 331 "HTTPPasswordMgrWithDefaultRealm",
324 332 "HTTPSHandler",
325 333 "install_opener",
326 334 "ProxyHandler",
327 335 "Request",
328 336 "urlopen",
329 337 ))
330 338 urlreq._registeraliases(urlparse, (
331 339 "urlparse",
332 340 "urlunparse",
333 341 ))
334 342 urlerr._registeraliases(urllib2, (
335 343 "HTTPError",
336 344 "URLError",
337 345 ))
338 346 httpserver._registeraliases(BaseHTTPServer, (
339 347 "HTTPServer",
340 348 "BaseHTTPRequestHandler",
341 349 ))
342 350 httpserver._registeraliases(SimpleHTTPServer, (
343 351 "SimpleHTTPRequestHandler",
344 352 ))
345 353 httpserver._registeraliases(CGIHTTPServer, (
346 354 "CGIHTTPRequestHandler",
347 355 ))
348 356
349 357 else:
350 358 import urllib.parse
351 359 urlreq._registeraliases(urllib.parse, (
352 360 "splitattr",
353 361 "splitpasswd",
354 362 "splitport",
355 363 "splituser",
356 364 "urlparse",
357 365 "urlunparse",
358 366 ))
359 367 urlreq._registeralias(urllib.parse, "unquote_to_bytes", "unquote")
360 368 import urllib.request
361 369 urlreq._registeraliases(urllib.request, (
362 370 "AbstractHTTPHandler",
363 371 "BaseHandler",
364 372 "build_opener",
365 373 "FileHandler",
366 374 "FTPHandler",
367 375 "ftpwrapper",
368 376 "HTTPHandler",
369 377 "HTTPSHandler",
370 378 "install_opener",
371 379 "pathname2url",
372 380 "HTTPBasicAuthHandler",
373 381 "HTTPDigestAuthHandler",
374 382 "HTTPPasswordMgrWithDefaultRealm",
375 383 "ProxyHandler",
376 384 "Request",
377 385 "url2pathname",
378 386 "urlopen",
379 387 ))
380 388 import urllib.response
381 389 urlreq._registeraliases(urllib.response, (
382 390 "addclosehook",
383 391 "addinfourl",
384 392 ))
385 393 import urllib.error
386 394 urlerr._registeraliases(urllib.error, (
387 395 "HTTPError",
388 396 "URLError",
389 397 ))
390 398 import http.server
391 399 httpserver._registeraliases(http.server, (
392 400 "HTTPServer",
393 401 "BaseHTTPRequestHandler",
394 402 "SimpleHTTPRequestHandler",
395 403 "CGIHTTPRequestHandler",
396 404 ))
397 405
398 406 # urllib.parse.quote() accepts both str and bytes, decodes bytes
399 407 # (if necessary), and returns str. This is wonky. We provide a custom
400 408 # implementation that only accepts bytes and emits bytes.
401 409 def quote(s, safe=r'/'):
402 410 s = urllib.parse.quote_from_bytes(s, safe=safe)
403 411 return s.encode('ascii', 'strict')
404 412
405 413 # urllib.parse.urlencode() returns str. We use this function to make
406 414 # sure we return bytes.
407 415 def urlencode(query, doseq=False):
408 416 s = urllib.parse.urlencode(query, doseq=doseq)
409 417 return s.encode('ascii')
410 418
411 419 urlreq.quote = quote
412 420 urlreq.urlencode = urlencode
General Comments 0
You need to be logged in to leave comments. Login now