##// END OF EJS Templates
typing: add trivial type hints to mercurial/ui.py...
Matt Harbison -
r50692:0449fb77 default
parent child Browse files
Show More
@@ -1,2320 +1,2333 b''
1 1 # ui.py - user interface bits for mercurial
2 2 #
3 3 # Copyright 2005-2007 Olivia Mackall <olivia@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import collections
10 10 import contextlib
11 11 import datetime
12 12 import errno
13 13 import inspect
14 14 import os
15 15 import re
16 16 import signal
17 17 import socket
18 18 import subprocess
19 19 import sys
20 20 import traceback
21 21
22 22 from typing import (
23 Any,
24 Callable,
23 25 Dict,
24 26 List,
27 NoReturn,
25 28 Optional,
26 29 Tuple,
30 Type,
31 TypeVar,
27 32 Union,
28 33 cast,
29 34 )
30 35
31 36 from .i18n import _
32 37 from .node import hex
33 38 from .pycompat import (
34 39 getattr,
35 40 open,
36 41 )
37 42
38 43 from . import (
39 44 color,
40 45 config,
41 46 configitems,
42 47 encoding,
43 48 error,
44 49 formatter,
45 50 loggingutil,
46 51 progress,
47 52 pycompat,
48 53 rcutil,
49 54 scmutil,
50 55 util,
51 56 )
52 57 from .utils import (
53 58 dateutil,
54 59 procutil,
55 60 resourceutil,
56 61 stringutil,
57 62 urlutil,
58 63 )
59 64
65 _ConfigItems = Dict[Tuple[bytes, bytes], object] # {(section, name) : value}
60 66 # The **opts args of the various write() methods can be basically anything, but
61 67 # there's no way to express it as "anything but str". So type it to be the
62 68 # handful of known types that are used.
63 69 _MsgOpts = Union[bytes, bool, List["_PromptChoice"]]
64 70 _PromptChoice = Tuple[bytes, bytes]
71 _Tui = TypeVar('_Tui', bound="ui")
65 72
66 73 urlreq = util.urlreq
67 74
68 75 # for use with str.translate(None, _keepalnum), to keep just alphanumerics
69 _keepalnum = b''.join(
76 _keepalnum: bytes = b''.join(
70 77 c for c in map(pycompat.bytechr, range(256)) if not c.isalnum()
71 78 )
72 79
73 80 # The config knobs that will be altered (if unset) by ui.tweakdefaults.
74 tweakrc = b"""
81 tweakrc: bytes = b"""
75 82 [ui]
76 83 # The rollback command is dangerous. As a rule, don't use it.
77 84 rollback = False
78 85 # Make `hg status` report copy information
79 86 statuscopies = yes
80 87 # Prefer curses UIs when available. Revert to plain-text with `text`.
81 88 interface = curses
82 89 # Make compatible commands emit cwd-relative paths by default.
83 90 relative-paths = yes
84 91
85 92 [commands]
86 93 # Grep working directory by default.
87 94 grep.all-files = True
88 95 # Refuse to perform an `hg update` that would cause a file content merge
89 96 update.check = noconflict
90 97 # Show conflicts information in `hg status`
91 98 status.verbose = True
92 99 # Make `hg resolve` with no action (like `-m`) fail instead of re-merging.
93 100 resolve.explicit-re-merge = True
94 101
95 102 [diff]
96 103 git = 1
97 104 showfunc = 1
98 105 word-diff = 1
99 106 """
100 107
101 samplehgrcs = {
108 samplehgrcs: Dict[bytes, bytes] = {
102 109 b'user': b"""# example user config (see 'hg help config' for more info)
103 110 [ui]
104 111 # name and email, e.g.
105 112 # username = Jane Doe <jdoe@example.com>
106 113 username =
107 114
108 115 # We recommend enabling tweakdefaults to get slight improvements to
109 116 # the UI over time. Make sure to set HGPLAIN in the environment when
110 117 # writing scripts!
111 118 # tweakdefaults = True
112 119
113 120 # uncomment to disable color in command output
114 121 # (see 'hg help color' for details)
115 122 # color = never
116 123
117 124 # uncomment to disable command output pagination
118 125 # (see 'hg help pager' for details)
119 126 # paginate = never
120 127
121 128 [extensions]
122 129 # uncomment the lines below to enable some popular extensions
123 130 # (see 'hg help extensions' for more info)
124 131 #
125 132 # histedit =
126 133 # rebase =
127 134 # uncommit =
128 135 """,
129 136 b'cloned': b"""# example repository config (see 'hg help config' for more info)
130 137 [paths]
131 138 default = %s
132 139
133 140 # path aliases to other clones of this repo in URLs or filesystem paths
134 141 # (see 'hg help config.paths' for more info)
135 142 #
136 143 # default:pushurl = ssh://jdoe@example.net/hg/jdoes-fork
137 144 # my-fork = ssh://jdoe@example.net/hg/jdoes-fork
138 145 # my-clone = /home/jdoe/jdoes-clone
139 146
140 147 [ui]
141 148 # name and email (local to this repository, optional), e.g.
142 149 # username = Jane Doe <jdoe@example.com>
143 150 """,
144 151 b'local': b"""# example repository config (see 'hg help config' for more info)
145 152 [paths]
146 153 # path aliases to other clones of this repo in URLs or filesystem paths
147 154 # (see 'hg help config.paths' for more info)
148 155 #
149 156 # default = http://example.com/hg/example-repo
150 157 # default:pushurl = ssh://jdoe@example.net/hg/jdoes-fork
151 158 # my-fork = ssh://jdoe@example.net/hg/jdoes-fork
152 159 # my-clone = /home/jdoe/jdoes-clone
153 160
154 161 [ui]
155 162 # name and email (local to this repository, optional), e.g.
156 163 # username = Jane Doe <jdoe@example.com>
157 164 """,
158 165 b'global': b"""# example system-wide hg config (see 'hg help config' for more info)
159 166
160 167 [ui]
161 168 # uncomment to disable color in command output
162 169 # (see 'hg help color' for details)
163 170 # color = never
164 171
165 172 # uncomment to disable command output pagination
166 173 # (see 'hg help pager' for details)
167 174 # paginate = never
168 175
169 176 [extensions]
170 177 # uncomment the lines below to enable some popular extensions
171 178 # (see 'hg help extensions' for more info)
172 179 #
173 180 # blackbox =
174 181 # churn =
175 182 """,
176 183 }
177 184
178 185
179 186 def _maybestrurl(maybebytes):
180 187 return pycompat.rapply(pycompat.strurl, maybebytes)
181 188
182 189
183 190 def _maybebytesurl(maybestr):
184 191 return pycompat.rapply(pycompat.bytesurl, maybestr)
185 192
186 193
187 194 class httppasswordmgrdbproxy:
188 195 """Delays loading urllib2 until it's needed."""
189 196
190 def __init__(self):
197 def __init__(self) -> None:
191 198 self._mgr = None
192 199
193 200 def _get_mgr(self):
194 201 if self._mgr is None:
195 202 self._mgr = urlreq.httppasswordmgrwithdefaultrealm()
196 203 return self._mgr
197 204
198 205 def add_password(self, realm, uris, user, passwd):
199 206 return self._get_mgr().add_password(
200 207 _maybestrurl(realm),
201 208 _maybestrurl(uris),
202 209 _maybestrurl(user),
203 210 _maybestrurl(passwd),
204 211 )
205 212
206 213 def find_user_password(self, realm, uri):
207 214 mgr = self._get_mgr()
208 215 return _maybebytesurl(
209 216 mgr.find_user_password(_maybestrurl(realm), _maybestrurl(uri))
210 217 )
211 218
212 219
213 def _catchterm(*args):
220 def _catchterm(*args) -> NoReturn:
214 221 raise error.SignalInterrupt
215 222
216 223
217 224 # unique object used to detect no default value has been provided when
218 225 # retrieving configuration value.
219 226 _unset = object()
220 227
221 228 # _reqexithandlers: callbacks run at the end of a request
222 _reqexithandlers = []
229 _reqexithandlers: List = []
223 230
224 231
225 232 class ui:
226 def __init__(self, src=None):
233 def __init__(self, src: Optional["ui"] = None) -> None:
227 234 """Create a fresh new ui object if no src given
228 235
229 236 Use uimod.ui.load() to create a ui which knows global and user configs.
230 237 In most cases, you should use ui.copy() to create a copy of an existing
231 238 ui object.
232 239 """
233 240 # _buffers: used for temporary capture of output
234 241 self._buffers = []
235 242 # 3-tuple describing how each buffer in the stack behaves.
236 243 # Values are (capture stderr, capture subprocesses, apply labels).
237 244 self._bufferstates = []
238 245 # When a buffer is active, defines whether we are expanding labels.
239 246 # This exists to prevent an extra list lookup.
240 247 self._bufferapplylabels = None
241 248 self.quiet = self.verbose = self.debugflag = self.tracebackflag = False
242 249 self._reportuntrusted = True
243 250 self._knownconfig = configitems.coreitems
244 251 self._ocfg = config.config() # overlay
245 252 self._tcfg = config.config() # trusted
246 253 self._ucfg = config.config() # untrusted
247 254 self._trustusers = set()
248 255 self._trustgroups = set()
249 256 self.callhooks = True
250 257 # hold the root to use for each [paths] entry
251 258 self._path_to_root = {}
252 259 # Insecure server connections requested.
253 260 self.insecureconnections = False
254 261 # Blocked time
255 262 self.logblockedtimes = False
256 263 # color mode: see mercurial/color.py for possible value
257 264 self._colormode = None
258 265 self._terminfoparams = {}
259 266 self._styles = {}
260 267 self._uninterruptible = False
261 268 self.showtimestamp = False
262 269
263 270 if src:
264 271 self._fout = src._fout
265 272 self._ferr = src._ferr
266 273 self._fin = src._fin
267 274 self._fmsg = src._fmsg
268 275 self._fmsgout = src._fmsgout
269 276 self._fmsgerr = src._fmsgerr
270 277 self._finoutredirected = src._finoutredirected
271 278 self._loggers = src._loggers.copy()
272 279 self.pageractive = src.pageractive
273 280 self._disablepager = src._disablepager
274 281 self._tweaked = src._tweaked
275 282
276 283 self._tcfg = src._tcfg.copy()
277 284 self._ucfg = src._ucfg.copy()
278 285 self._ocfg = src._ocfg.copy()
279 286 self._trustusers = src._trustusers.copy()
280 287 self._trustgroups = src._trustgroups.copy()
281 288 self.environ = src.environ
282 289 self.callhooks = src.callhooks
283 290 self._path_to_root = src._path_to_root
284 291 self.insecureconnections = src.insecureconnections
285 292 self._colormode = src._colormode
286 293 self._terminfoparams = src._terminfoparams.copy()
287 294 self._styles = src._styles.copy()
288 295
289 296 self.fixconfig()
290 297
291 298 self.httppasswordmgrdb = src.httppasswordmgrdb
292 299 self._blockedtimes = src._blockedtimes
293 300 else:
294 301 self._fout = procutil.stdout
295 302 self._ferr = procutil.stderr
296 303 self._fin = procutil.stdin
297 304 self._fmsg = None
298 305 self._fmsgout = self.fout # configurable
299 306 self._fmsgerr = self.ferr # configurable
300 307 self._finoutredirected = False
301 308 self._loggers = {}
302 309 self.pageractive = False
303 310 self._disablepager = False
304 311 self._tweaked = False
305 312
306 313 # shared read-only environment
307 314 self.environ = encoding.environ
308 315
309 316 self.httppasswordmgrdb = httppasswordmgrdbproxy()
310 317 self._blockedtimes = collections.defaultdict(int)
311 318
312 319 allowed = self.configlist(b'experimental', b'exportableenviron')
313 320 if b'*' in allowed:
314 321 self._exportableenviron = self.environ
315 322 else:
316 323 self._exportableenviron = {}
317 324 for k in allowed:
318 325 if k in self.environ:
319 326 self._exportableenviron[k] = self.environ[k]
320 327
321 def _new_source(self):
328 def _new_source(self) -> None:
322 329 self._ocfg.new_source()
323 330 self._tcfg.new_source()
324 331 self._ucfg.new_source()
325 332
326 333 @classmethod
327 def load(cls):
334 def load(cls: Type[_Tui]) -> _Tui:
328 335 """Create a ui and load global and user configs"""
329 336 u = cls()
330 337 # we always trust global config files and environment variables
331 338 for t, f in rcutil.rccomponents():
332 339 if t == b'path':
333 340 u.readconfig(f, trust=True)
334 341 elif t == b'resource':
335 342 u.read_resource_config(f, trust=True)
336 343 elif t == b'items':
337 344 u._new_source()
338 345 sections = set()
339 346 for section, name, value, source in f:
340 347 # do not set u._ocfg
341 348 # XXX clean this up once immutable config object is a thing
342 349 u._tcfg.set(section, name, value, source)
343 350 u._ucfg.set(section, name, value, source)
344 351 sections.add(section)
345 352 for section in sections:
346 353 u.fixconfig(section=section)
347 354 else:
348 355 raise error.ProgrammingError(b'unknown rctype: %s' % t)
349 356 u._maybetweakdefaults()
350 357 u._new_source() # anything after that is a different level
351 358 return u
352 359
353 def _maybetweakdefaults(self):
360 def _maybetweakdefaults(self) -> None:
354 361 if not self.configbool(b'ui', b'tweakdefaults'):
355 362 return
356 363 if self._tweaked or self.plain(b'tweakdefaults'):
357 364 return
358 365
359 366 # Note: it is SUPER IMPORTANT that you set self._tweaked to
360 367 # True *before* any calls to setconfig(), otherwise you'll get
361 368 # infinite recursion between setconfig and this method.
362 369 #
363 370 # TODO: We should extract an inner method in setconfig() to
364 371 # avoid this weirdness.
365 372 self._tweaked = True
366 373 tmpcfg = config.config()
367 374 tmpcfg.parse(b'<tweakdefaults>', tweakrc)
368 375 for section in tmpcfg:
369 376 for name, value in tmpcfg.items(section):
370 377 if not self.hasconfig(section, name):
371 378 self.setconfig(section, name, value, b"<tweakdefaults>")
372 379
373 def copy(self):
380 def copy(self: _Tui) -> _Tui:
374 381 return self.__class__(self)
375 382
376 def resetstate(self):
383 def resetstate(self) -> None:
377 384 """Clear internal state that shouldn't persist across commands"""
378 385 if self._progbar:
379 386 self._progbar.resetstate() # reset last-print time of progress bar
380 387 self.httppasswordmgrdb = httppasswordmgrdbproxy()
381 388
382 389 @contextlib.contextmanager
383 def timeblockedsection(self, key):
390 def timeblockedsection(self, key: bytes):
384 391 # this is open-coded below - search for timeblockedsection to find them
385 392 starttime = util.timer()
386 393 try:
387 394 yield
388 395 finally:
389 396 self._blockedtimes[key + b'_blocked'] += (
390 397 util.timer() - starttime
391 398 ) * 1000
392 399
393 400 @contextlib.contextmanager
394 401 def uninterruptible(self):
395 402 """Mark an operation as unsafe.
396 403
397 404 Most operations on a repository are safe to interrupt, but a
398 405 few are risky (for example repair.strip). This context manager
399 406 lets you advise Mercurial that something risky is happening so
400 407 that control-C etc can be blocked if desired.
401 408 """
402 409 enabled = self.configbool(b'experimental', b'nointerrupt')
403 410 if enabled and self.configbool(
404 411 b'experimental', b'nointerrupt-interactiveonly'
405 412 ):
406 413 enabled = self.interactive()
407 414 if self._uninterruptible or not enabled:
408 415 # if nointerrupt support is turned off, the process isn't
409 416 # interactive, or we're already in an uninterruptible
410 417 # block, do nothing.
411 418 yield
412 419 return
413 420
414 421 def warn():
415 422 self.warn(_(b"shutting down cleanly\n"))
416 423 self.warn(
417 424 _(b"press ^C again to terminate immediately (dangerous)\n")
418 425 )
419 426 return True
420 427
421 428 with procutil.uninterruptible(warn):
422 429 try:
423 430 self._uninterruptible = True
424 431 yield
425 432 finally:
426 433 self._uninterruptible = False
427 434
428 def formatter(self, topic, opts):
435 def formatter(self, topic: bytes, opts):
429 436 return formatter.formatter(self, self, topic, opts)
430 437
431 def _trusted(self, fp, f):
438 def _trusted(self, fp, f: bytes) -> bool:
432 439 st = util.fstat(fp)
433 440 if util.isowner(st):
434 441 return True
435 442
436 443 tusers, tgroups = self._trustusers, self._trustgroups
437 444 if b'*' in tusers or b'*' in tgroups:
438 445 return True
439 446
440 447 user = util.username(st.st_uid)
441 448 group = util.groupname(st.st_gid)
442 449 if user in tusers or group in tgroups or user == util.username():
443 450 return True
444 451
445 452 if self._reportuntrusted:
446 453 self.warn(
447 454 _(
448 455 b'not trusting file %s from untrusted '
449 456 b'user %s, group %s\n'
450 457 )
451 458 % (f, user, group)
452 459 )
453 460 return False
454 461
455 462 def read_resource_config(
456 463 self, name, root=None, trust=False, sections=None, remap=None
457 ):
464 ) -> None:
458 465 try:
459 466 fp = resourceutil.open_resource(name[0], name[1])
460 467 except IOError:
461 468 if not sections: # ignore unless we were looking for something
462 469 return
463 470 raise
464 471
465 472 self._readconfig(
466 473 b'resource:%s.%s' % name, fp, root, trust, sections, remap
467 474 )
468 475
469 476 def readconfig(
470 477 self, filename, root=None, trust=False, sections=None, remap=None
471 ):
478 ) -> None:
472 479 try:
473 480 fp = open(filename, 'rb')
474 481 except IOError:
475 482 if not sections: # ignore unless we were looking for something
476 483 return
477 484 raise
478 485
479 486 self._readconfig(filename, fp, root, trust, sections, remap)
480 487
481 488 def _readconfig(
482 489 self, filename, fp, root=None, trust=False, sections=None, remap=None
483 ):
490 ) -> None:
484 491 with fp:
485 492 cfg = config.config()
486 493 trusted = sections or trust or self._trusted(fp, filename)
487 494
488 495 try:
489 496 cfg.read(filename, fp, sections=sections, remap=remap)
490 497 except error.ConfigError as inst:
491 498 if trusted:
492 499 raise
493 500 self.warn(
494 501 _(b'ignored %s: %s\n') % (inst.location, inst.message)
495 502 )
496 503
497 504 self._applyconfig(cfg, trusted, root)
498 505
499 def applyconfig(self, configitems, source=b"", root=None):
506 def applyconfig(
507 self, configitems: _ConfigItems, source=b"", root=None
508 ) -> None:
500 509 """Add configitems from a non-file source. Unlike with ``setconfig()``,
501 510 they can be overridden by subsequent config file reads. The items are
502 511 in the same format as ``configoverride()``, namely a dict of the
503 512 following structures: {(section, name) : value}
504 513
505 514 Typically this is used by extensions that inject themselves into the
506 515 config file load procedure by monkeypatching ``localrepo.loadhgrc()``.
507 516 """
508 517 cfg = config.config()
509 518
510 519 for (section, name), value in configitems.items():
511 520 cfg.set(section, name, value, source)
512 521
513 522 self._applyconfig(cfg, True, root)
514 523
515 def _applyconfig(self, cfg, trusted, root):
524 def _applyconfig(self, cfg, trusted, root) -> None:
516 525 if self.plain():
517 526 for k in (
518 527 b'debug',
519 528 b'fallbackencoding',
520 529 b'quiet',
521 530 b'slash',
522 531 b'logtemplate',
523 532 b'message-output',
524 533 b'statuscopies',
525 534 b'style',
526 535 b'traceback',
527 536 b'verbose',
528 537 ):
529 538 if k in cfg[b'ui']:
530 539 del cfg[b'ui'][k]
531 540 for k, v in cfg.items(b'defaults'):
532 541 del cfg[b'defaults'][k]
533 542 for k, v in cfg.items(b'commands'):
534 543 del cfg[b'commands'][k]
535 544 for k, v in cfg.items(b'command-templates'):
536 545 del cfg[b'command-templates'][k]
537 546 # Don't remove aliases from the configuration if in the exceptionlist
538 547 if self.plain(b'alias'):
539 548 for k, v in cfg.items(b'alias'):
540 549 del cfg[b'alias'][k]
541 550 if self.plain(b'revsetalias'):
542 551 for k, v in cfg.items(b'revsetalias'):
543 552 del cfg[b'revsetalias'][k]
544 553 if self.plain(b'templatealias'):
545 554 for k, v in cfg.items(b'templatealias'):
546 555 del cfg[b'templatealias'][k]
547 556
548 557 if trusted:
549 558 self._tcfg.update(cfg)
550 559 self._tcfg.update(self._ocfg)
551 560 self._ucfg.update(cfg)
552 561 self._ucfg.update(self._ocfg)
553 562
554 563 if root is None:
555 564 root = os.path.expanduser(b'~')
556 565 self.fixconfig(root=root)
557 566
558 def fixconfig(self, root=None, section=None):
567 def fixconfig(self, root=None, section=None) -> None:
559 568 if section in (None, b'paths'):
560 569 # expand vars and ~
561 570 # translate paths relative to root (or home) into absolute paths
562 571 root = root or encoding.getcwd()
563 572 for c in self._tcfg, self._ucfg, self._ocfg:
564 573 for n, p in c.items(b'paths'):
565 574 old_p = p
566 575 s = self.configsource(b'paths', n) or b'none'
567 576 root_key = (n, p, s)
568 577 if root_key not in self._path_to_root:
569 578 self._path_to_root[root_key] = root
570 579 # Ignore sub-options.
571 580 if b':' in n:
572 581 continue
573 582 if not p:
574 583 continue
575 584 if b'%%' in p:
576 585 if s is None:
577 586 s = 'none'
578 587 self.warn(
579 588 _(b"(deprecated '%%' in path %s=%s from %s)\n")
580 589 % (n, p, s)
581 590 )
582 591 p = p.replace(b'%%', b'%')
583 592 if p != old_p:
584 593 c.alter(b"paths", n, p)
585 594
586 595 if section in (None, b'ui'):
587 596 # update ui options
588 597 self._fmsgout, self._fmsgerr = _selectmsgdests(self)
589 598 self.debugflag = self.configbool(b'ui', b'debug')
590 599 self.verbose = self.debugflag or self.configbool(b'ui', b'verbose')
591 600 self.quiet = not self.debugflag and self.configbool(b'ui', b'quiet')
592 601 if self.verbose and self.quiet:
593 602 self.quiet = self.verbose = False
594 603 self._reportuntrusted = self.debugflag or self.configbool(
595 604 b"ui", b"report_untrusted"
596 605 )
597 606 self.showtimestamp = self.configbool(b'ui', b'timestamp-output')
598 607 self.tracebackflag = self.configbool(b'ui', b'traceback')
599 608 self.logblockedtimes = self.configbool(b'ui', b'logblockedtimes')
600 609
601 610 if section in (None, b'trusted'):
602 611 # update trust information
603 612 self._trustusers.update(self.configlist(b'trusted', b'users'))
604 613 self._trustgroups.update(self.configlist(b'trusted', b'groups'))
605 614
606 615 if section in (None, b'devel', b'ui') and self.debugflag:
607 616 tracked = set()
608 617 if self.configbool(b'devel', b'debug.extensions'):
609 618 tracked.add(b'extension')
610 619 if tracked:
611 620 logger = loggingutil.fileobjectlogger(self._ferr, tracked)
612 621 self.setlogger(b'debug', logger)
613 622
614 623 def backupconfig(self, section, item):
615 624 return (
616 625 self._ocfg.backup(section, item),
617 626 self._tcfg.backup(section, item),
618 627 self._ucfg.backup(section, item),
619 628 )
620 629
621 def restoreconfig(self, data):
630 def restoreconfig(self, data) -> None:
622 631 self._ocfg.restore(data[0])
623 632 self._tcfg.restore(data[1])
624 633 self._ucfg.restore(data[2])
625 634
626 def setconfig(self, section, name, value, source=b''):
635 def setconfig(self, section, name, value, source=b'') -> None:
627 636 for cfg in (self._ocfg, self._tcfg, self._ucfg):
628 637 cfg.set(section, name, value, source)
629 638 self.fixconfig(section=section)
630 639 self._maybetweakdefaults()
631 640
632 641 def _data(self, untrusted):
633 642 return untrusted and self._ucfg or self._tcfg
634 643
635 644 def configsource(self, section, name, untrusted=False):
636 645 return self._data(untrusted).source(section, name)
637 646
638 647 def config(self, section, name, default=_unset, untrusted=False):
639 648 """return the plain string version of a config"""
640 649 value = self._config(
641 650 section, name, default=default, untrusted=untrusted
642 651 )
643 652 if value is _unset:
644 653 return None
645 654 return value
646 655
647 656 def _config(self, section, name, default=_unset, untrusted=False):
648 657 value = itemdefault = default
649 658 item = self._knownconfig.get(section, {}).get(name)
650 659 alternates = [(section, name)]
651 660
652 661 if item is not None:
653 662 alternates.extend(item.alias)
654 663 if callable(item.default):
655 664 itemdefault = item.default()
656 665 else:
657 666 itemdefault = item.default
658 667 else:
659 668 msg = b"accessing unregistered config item: '%s.%s'"
660 669 msg %= (section, name)
661 670 self.develwarn(msg, 2, b'warn-config-unknown')
662 671
663 672 if default is _unset:
664 673 if item is None:
665 674 value = default
666 675 elif item.default is configitems.dynamicdefault:
667 676 value = None
668 677 msg = b"config item requires an explicit default value: '%s.%s'"
669 678 msg %= (section, name)
670 679 self.develwarn(msg, 2, b'warn-config-default')
671 680 else:
672 681 value = itemdefault
673 682 elif (
674 683 item is not None
675 684 and item.default is not configitems.dynamicdefault
676 685 and default != itemdefault
677 686 ):
678 687 msg = (
679 688 b"specifying a mismatched default value for a registered "
680 689 b"config item: '%s.%s' '%s'"
681 690 )
682 691 msg %= (section, name, pycompat.bytestr(default))
683 692 self.develwarn(msg, 2, b'warn-config-default')
684 693
685 694 candidates = []
686 695 config = self._data(untrusted)
687 696 for s, n in alternates:
688 697 candidate = config.get(s, n, None)
689 698 if candidate is not None:
690 699 candidates.append((s, n, candidate))
691 700 if candidates:
692 701
693 702 def level(x):
694 703 return config.level(x[0], x[1])
695 704
696 705 value = max(candidates, key=level)[2]
697 706
698 707 if self.debugflag and not untrusted and self._reportuntrusted:
699 708 for s, n in alternates:
700 709 uvalue = self._ucfg.get(s, n)
701 710 if uvalue is not None and uvalue != value:
702 711 self.debug(
703 712 b"ignoring untrusted configuration option "
704 713 b"%s.%s = %s\n" % (s, n, uvalue)
705 714 )
706 715 return value
707 716
708 717 def config_default(self, section, name):
709 718 """return the default value for a config option
710 719
711 720 The default is returned "raw", for example if it is a callable, the
712 721 callable was not called.
713 722 """
714 723 item = self._knownconfig.get(section, {}).get(name)
715 724
716 725 if item is None:
717 726 raise KeyError((section, name))
718 727 return item.default
719 728
720 729 def configsuboptions(self, section, name, default=_unset, untrusted=False):
721 730 """Get a config option and all sub-options.
722 731
723 732 Some config options have sub-options that are declared with the
724 733 format "key:opt = value". This method is used to return the main
725 734 option and all its declared sub-options.
726 735
727 736 Returns a 2-tuple of ``(option, sub-options)``, where `sub-options``
728 737 is a dict of defined sub-options where keys and values are strings.
729 738 """
730 739 main = self.config(section, name, default, untrusted=untrusted)
731 740 data = self._data(untrusted)
732 741 sub = {}
733 742 prefix = b'%s:' % name
734 743 for k, v in data.items(section):
735 744 if k.startswith(prefix):
736 745 sub[k[len(prefix) :]] = v
737 746
738 747 if self.debugflag and not untrusted and self._reportuntrusted:
739 748 for k, v in sub.items():
740 749 uvalue = self._ucfg.get(section, b'%s:%s' % (name, k))
741 750 if uvalue is not None and uvalue != v:
742 751 self.debug(
743 752 b'ignoring untrusted configuration option '
744 753 b'%s:%s.%s = %s\n' % (section, name, k, uvalue)
745 754 )
746 755
747 756 return main, sub
748 757
749 758 def configpath(self, section, name, default=_unset, untrusted=False):
750 759 """get a path config item, expanded relative to repo root or config
751 760 file"""
752 761 v = self.config(section, name, default, untrusted)
753 762 if v is None:
754 763 return None
755 764 if not os.path.isabs(v) or b"://" not in v:
756 765 src = self.configsource(section, name, untrusted)
757 766 if b':' in src:
758 767 base = os.path.dirname(src.rsplit(b':')[0])
759 768 v = os.path.join(base, os.path.expanduser(v))
760 769 return v
761 770
762 771 def configbool(self, section, name, default=_unset, untrusted=False):
763 772 """parse a configuration element as a boolean
764 773
765 774 >>> u = ui(); s = b'foo'
766 775 >>> u.setconfig(s, b'true', b'yes')
767 776 >>> u.configbool(s, b'true')
768 777 True
769 778 >>> u.setconfig(s, b'false', b'no')
770 779 >>> u.configbool(s, b'false')
771 780 False
772 781 >>> u.configbool(s, b'unknown')
773 782 False
774 783 >>> u.configbool(s, b'unknown', True)
775 784 True
776 785 >>> u.setconfig(s, b'invalid', b'somevalue')
777 786 >>> u.configbool(s, b'invalid')
778 787 Traceback (most recent call last):
779 788 ...
780 789 ConfigError: foo.invalid is not a boolean ('somevalue')
781 790 """
782 791
783 792 v = self._config(section, name, default, untrusted=untrusted)
784 793 if v is None:
785 794 return v
786 795 if v is _unset:
787 796 if default is _unset:
788 797 return False
789 798 return default
790 799 if isinstance(v, bool):
791 800 return v
792 801 b = stringutil.parsebool(v)
793 802 if b is None:
794 803 raise error.ConfigError(
795 804 _(b"%s.%s is not a boolean ('%s')") % (section, name, v)
796 805 )
797 806 return b
798 807
799 808 def configwith(
800 809 self, convert, section, name, default=_unset, desc=None, untrusted=False
801 810 ):
802 811 """parse a configuration element with a conversion function
803 812
804 813 >>> u = ui(); s = b'foo'
805 814 >>> u.setconfig(s, b'float1', b'42')
806 815 >>> u.configwith(float, s, b'float1')
807 816 42.0
808 817 >>> u.setconfig(s, b'float2', b'-4.25')
809 818 >>> u.configwith(float, s, b'float2')
810 819 -4.25
811 820 >>> u.configwith(float, s, b'unknown', 7)
812 821 7.0
813 822 >>> u.setconfig(s, b'invalid', b'somevalue')
814 823 >>> u.configwith(float, s, b'invalid')
815 824 Traceback (most recent call last):
816 825 ...
817 826 ConfigError: foo.invalid is not a valid float ('somevalue')
818 827 >>> u.configwith(float, s, b'invalid', desc=b'womble')
819 828 Traceback (most recent call last):
820 829 ...
821 830 ConfigError: foo.invalid is not a valid womble ('somevalue')
822 831 """
823 832
824 833 v = self.config(section, name, default, untrusted)
825 834 if v is None:
826 835 return v # do not attempt to convert None
827 836 try:
828 837 return convert(v)
829 838 except (ValueError, error.ParseError):
830 839 if desc is None:
831 840 desc = pycompat.sysbytes(convert.__name__)
832 841 raise error.ConfigError(
833 842 _(b"%s.%s is not a valid %s ('%s')") % (section, name, desc, v)
834 843 )
835 844
836 845 def configint(self, section, name, default=_unset, untrusted=False):
837 846 """parse a configuration element as an integer
838 847
839 848 >>> u = ui(); s = b'foo'
840 849 >>> u.setconfig(s, b'int1', b'42')
841 850 >>> u.configint(s, b'int1')
842 851 42
843 852 >>> u.setconfig(s, b'int2', b'-42')
844 853 >>> u.configint(s, b'int2')
845 854 -42
846 855 >>> u.configint(s, b'unknown', 7)
847 856 7
848 857 >>> u.setconfig(s, b'invalid', b'somevalue')
849 858 >>> u.configint(s, b'invalid')
850 859 Traceback (most recent call last):
851 860 ...
852 861 ConfigError: foo.invalid is not a valid integer ('somevalue')
853 862 """
854 863
855 864 return self.configwith(
856 865 int, section, name, default, b'integer', untrusted
857 866 )
858 867
859 868 def configbytes(self, section, name, default=_unset, untrusted=False):
860 869 """parse a configuration element as a quantity in bytes
861 870
862 871 Units can be specified as b (bytes), k or kb (kilobytes), m or
863 872 mb (megabytes), g or gb (gigabytes).
864 873
865 874 >>> u = ui(); s = b'foo'
866 875 >>> u.setconfig(s, b'val1', b'42')
867 876 >>> u.configbytes(s, b'val1')
868 877 42
869 878 >>> u.setconfig(s, b'val2', b'42.5 kb')
870 879 >>> u.configbytes(s, b'val2')
871 880 43520
872 881 >>> u.configbytes(s, b'unknown', b'7 MB')
873 882 7340032
874 883 >>> u.setconfig(s, b'invalid', b'somevalue')
875 884 >>> u.configbytes(s, b'invalid')
876 885 Traceback (most recent call last):
877 886 ...
878 887 ConfigError: foo.invalid is not a byte quantity ('somevalue')
879 888 """
880 889
881 890 value = self._config(section, name, default, untrusted)
882 891 if value is _unset:
883 892 if default is _unset:
884 893 default = 0
885 894 value = default
886 895 if not isinstance(value, bytes):
887 896 return value
888 897 try:
889 898 return util.sizetoint(value)
890 899 except error.ParseError:
891 900 raise error.ConfigError(
892 901 _(b"%s.%s is not a byte quantity ('%s')")
893 902 % (section, name, value)
894 903 )
895 904
896 905 def configlist(self, section, name, default=_unset, untrusted=False):
897 906 """parse a configuration element as a list of comma/space separated
898 907 strings
899 908
900 909 >>> u = ui(); s = b'foo'
901 910 >>> u.setconfig(s, b'list1', b'this,is "a small" ,test')
902 911 >>> u.configlist(s, b'list1')
903 912 ['this', 'is', 'a small', 'test']
904 913 >>> u.setconfig(s, b'list2', b'this, is "a small" , test ')
905 914 >>> u.configlist(s, b'list2')
906 915 ['this', 'is', 'a small', 'test']
907 916 """
908 917 # default is not always a list
909 918 v = self.configwith(
910 919 stringutil.parselist, section, name, default, b'list', untrusted
911 920 )
912 921 if isinstance(v, bytes):
913 922 return stringutil.parselist(v)
914 923 elif v is None:
915 924 return []
916 925 return v
917 926
918 927 def configdate(self, section, name, default=_unset, untrusted=False):
919 928 """parse a configuration element as a tuple of ints
920 929
921 930 >>> u = ui(); s = b'foo'
922 931 >>> u.setconfig(s, b'date', b'0 0')
923 932 >>> u.configdate(s, b'date')
924 933 (0, 0)
925 934 """
926 935 if self.config(section, name, default, untrusted):
927 936 return self.configwith(
928 937 dateutil.parsedate, section, name, default, b'date', untrusted
929 938 )
930 939 if default is _unset:
931 940 return None
932 941 return default
933 942
934 943 def configdefault(self, section, name):
935 944 """returns the default value of the config item"""
936 945 item = self._knownconfig.get(section, {}).get(name)
937 946 itemdefault = None
938 947 if item is not None:
939 948 if callable(item.default):
940 949 itemdefault = item.default()
941 950 else:
942 951 itemdefault = item.default
943 952 return itemdefault
944 953
945 954 def hasconfig(self, section, name, untrusted=False):
946 955 return self._data(untrusted).hasitem(section, name)
947 956
948 957 def has_section(self, section, untrusted=False):
949 958 '''tell whether section exists in config.'''
950 959 return section in self._data(untrusted)
951 960
952 961 def configitems(self, section, untrusted=False, ignoresub=False):
953 962 items = self._data(untrusted).items(section)
954 963 if ignoresub:
955 964 items = [i for i in items if b':' not in i[0]]
956 965 if self.debugflag and not untrusted and self._reportuntrusted:
957 966 for k, v in self._ucfg.items(section):
958 967 if self._tcfg.get(section, k) != v:
959 968 self.debug(
960 969 b"ignoring untrusted configuration option "
961 970 b"%s.%s = %s\n" % (section, k, v)
962 971 )
963 972 return items
964 973
965 974 def walkconfig(self, untrusted=False, all_known=False):
966 975 defined = self._walk_config(untrusted)
967 976 if not all_known:
968 977 for d in defined:
969 978 yield d
970 979 return
971 980 known = self._walk_known()
972 981 current_defined = next(defined, None)
973 982 current_known = next(known, None)
974 983 while current_defined is not None or current_known is not None:
975 984 if current_defined is None:
976 985 yield current_known
977 986 current_known = next(known, None)
978 987 elif current_known is None:
979 988 yield current_defined
980 989 current_defined = next(defined, None)
981 990 elif current_known[0:2] == current_defined[0:2]:
982 991 yield current_defined
983 992 current_defined = next(defined, None)
984 993 current_known = next(known, None)
985 994 elif current_known[0:2] < current_defined[0:2]:
986 995 yield current_known
987 996 current_known = next(known, None)
988 997 else:
989 998 yield current_defined
990 999 current_defined = next(defined, None)
991 1000
992 1001 def _walk_known(self):
993 1002 for section, items in sorted(self._knownconfig.items()):
994 1003 for k, i in sorted(items.items()):
995 1004 # We don't have a way to display generic well, so skip them
996 1005 if i.generic:
997 1006 continue
998 1007 if callable(i.default):
999 1008 default = i.default()
1000 1009 elif i.default is configitems.dynamicdefault:
1001 1010 default = b'<DYNAMIC>'
1002 1011 else:
1003 1012 default = i.default
1004 1013 yield section, i.name, default
1005 1014
1006 1015 def _walk_config(self, untrusted):
1007 1016 cfg = self._data(untrusted)
1008 1017 for section in cfg.sections():
1009 1018 for name, value in self.configitems(section, untrusted):
1010 1019 yield section, name, value
1011 1020
1012 def plain(self, feature=None):
1021 def plain(self, feature: Optional[bytes] = None) -> bool:
1013 1022 """is plain mode active?
1014 1023
1015 1024 Plain mode means that all configuration variables which affect
1016 1025 the behavior and output of Mercurial should be
1017 1026 ignored. Additionally, the output should be stable,
1018 1027 reproducible and suitable for use in scripts or applications.
1019 1028
1020 1029 The only way to trigger plain mode is by setting either the
1021 1030 `HGPLAIN' or `HGPLAINEXCEPT' environment variables.
1022 1031
1023 1032 The return value can either be
1024 1033 - False if HGPLAIN is not set, or feature is in HGPLAINEXCEPT
1025 1034 - False if feature is disabled by default and not included in HGPLAIN
1026 1035 - True otherwise
1027 1036 """
1028 1037 if (
1029 1038 b'HGPLAIN' not in encoding.environ
1030 1039 and b'HGPLAINEXCEPT' not in encoding.environ
1031 1040 ):
1032 1041 return False
1033 1042 exceptions = (
1034 1043 encoding.environ.get(b'HGPLAINEXCEPT', b'').strip().split(b',')
1035 1044 )
1036 1045 # TODO: add support for HGPLAIN=+feature,-feature syntax
1037 1046 if b'+strictflags' not in encoding.environ.get(b'HGPLAIN', b'').split(
1038 1047 b','
1039 1048 ):
1040 1049 exceptions.append(b'strictflags')
1041 1050 if feature and exceptions:
1042 1051 return feature not in exceptions
1043 1052 return True
1044 1053
1045 1054 def username(self, acceptempty=False):
1046 1055 """Return default username to be used in commits.
1047 1056
1048 1057 Searched in this order: $HGUSER, [ui] section of hgrcs, $EMAIL
1049 1058 and stop searching if one of these is set.
1050 1059 If not found and acceptempty is True, returns None.
1051 1060 If not found and ui.askusername is True, ask the user, else use
1052 1061 ($LOGNAME or $USER or $LNAME or $USERNAME) + "@full.hostname".
1053 1062 If no username could be found, raise an Abort error.
1054 1063 """
1055 1064 user = encoding.environ.get(b"HGUSER")
1056 1065 if user is None:
1057 1066 user = self.config(b"ui", b"username")
1058 1067 if user is not None:
1059 1068 user = os.path.expandvars(user)
1060 1069 if user is None:
1061 1070 user = encoding.environ.get(b"EMAIL")
1062 1071 if user is None and acceptempty:
1063 1072 return user
1064 1073 if user is None and self.configbool(b"ui", b"askusername"):
1065 1074 user = self.prompt(_(b"enter a commit username:"), default=None)
1066 1075 if user is None and not self.interactive():
1067 1076 try:
1068 1077 user = b'%s@%s' % (
1069 1078 procutil.getuser(),
1070 1079 encoding.strtolocal(socket.getfqdn()),
1071 1080 )
1072 1081 self.warn(_(b"no username found, using '%s' instead\n") % user)
1073 1082 except KeyError:
1074 1083 pass
1075 1084 if not user:
1076 1085 raise error.Abort(
1077 1086 _(b'no username supplied'),
1078 1087 hint=_(b"use 'hg config --edit' " b'to set your username'),
1079 1088 )
1080 1089 if b"\n" in user:
1081 1090 raise error.Abort(
1082 1091 _(b"username %r contains a newline\n") % pycompat.bytestr(user)
1083 1092 )
1084 1093 return user
1085 1094
1086 def shortuser(self, user):
1095 def shortuser(self, user: bytes) -> bytes:
1087 1096 """Return a short representation of a user name or email address."""
1088 1097 if not self.verbose:
1089 1098 user = stringutil.shortuser(user)
1090 1099 return user
1091 1100
1092 1101 def expandpath(self, loc, default=None):
1093 1102 """Return repository location relative to cwd or from [paths]"""
1094 1103 msg = b'ui.expandpath is deprecated, use `get_*` functions from urlutil'
1095 1104 self.deprecwarn(msg, b'6.0')
1096 1105 try:
1097 1106 p = self.getpath(loc)
1098 1107 if p:
1099 1108 return p.rawloc
1100 1109 except error.RepoError:
1101 1110 pass
1102 1111
1103 1112 if default:
1104 1113 try:
1105 1114 p = self.getpath(default)
1106 1115 if p:
1107 1116 return p.rawloc
1108 1117 except error.RepoError:
1109 1118 pass
1110 1119
1111 1120 return loc
1112 1121
1113 1122 @util.propertycache
1114 1123 def paths(self):
1115 1124 return urlutil.paths(self)
1116 1125
1117 1126 def getpath(self, *args, **kwargs):
1118 1127 """see paths.getpath for details
1119 1128
1120 1129 This method exist as `getpath` need a ui for potential warning message.
1121 1130 """
1122 1131 msg = b'ui.getpath is deprecated, use `get_*` functions from urlutil'
1123 1132 self.deprecwarn(msg, b'6.0')
1124 1133 return self.paths.getpath(self, *args, **kwargs)
1125 1134
1126 1135 @property
1127 1136 def fout(self):
1128 1137 return self._fout
1129 1138
1130 1139 @fout.setter
1131 1140 def fout(self, f):
1132 1141 self._fout = f
1133 1142 self._fmsgout, self._fmsgerr = _selectmsgdests(self)
1134 1143
1135 1144 @property
1136 1145 def ferr(self):
1137 1146 return self._ferr
1138 1147
1139 1148 @ferr.setter
1140 1149 def ferr(self, f):
1141 1150 self._ferr = f
1142 1151 self._fmsgout, self._fmsgerr = _selectmsgdests(self)
1143 1152
1144 1153 @property
1145 1154 def fin(self):
1146 1155 return self._fin
1147 1156
1148 1157 @fin.setter
1149 1158 def fin(self, f):
1150 1159 self._fin = f
1151 1160
1152 1161 @property
1153 1162 def fmsg(self):
1154 1163 """Stream dedicated for status/error messages; may be None if
1155 1164 fout/ferr are used"""
1156 1165 return self._fmsg
1157 1166
1158 1167 @fmsg.setter
1159 1168 def fmsg(self, f):
1160 1169 self._fmsg = f
1161 1170 self._fmsgout, self._fmsgerr = _selectmsgdests(self)
1162 1171
1163 1172 @contextlib.contextmanager
1164 def silent(self, error=False, subproc=False, labeled=False):
1173 def silent(
1174 self, error: bool = False, subproc: bool = False, labeled: bool = False
1175 ):
1165 1176 self.pushbuffer(error=error, subproc=subproc, labeled=labeled)
1166 1177 try:
1167 1178 yield
1168 1179 finally:
1169 1180 self.popbuffer()
1170 1181
1171 def pushbuffer(self, error=False, subproc=False, labeled=False):
1182 def pushbuffer(
1183 self, error: bool = False, subproc: bool = False, labeled: bool = False
1184 ) -> None:
1172 1185 """install a buffer to capture standard output of the ui object
1173 1186
1174 1187 If error is True, the error output will be captured too.
1175 1188
1176 1189 If subproc is True, output from subprocesses (typically hooks) will be
1177 1190 captured too.
1178 1191
1179 1192 If labeled is True, any labels associated with buffered
1180 1193 output will be handled. By default, this has no effect
1181 1194 on the output returned, but extensions and GUI tools may
1182 1195 handle this argument and returned styled output. If output
1183 1196 is being buffered so it can be captured and parsed or
1184 1197 processed, labeled should not be set to True.
1185 1198 """
1186 1199 self._buffers.append([])
1187 1200 self._bufferstates.append((error, subproc, labeled))
1188 1201 self._bufferapplylabels = labeled
1189 1202
1190 def popbuffer(self):
1203 def popbuffer(self) -> bytes:
1191 1204 '''pop the last buffer and return the buffered output'''
1192 1205 self._bufferstates.pop()
1193 1206 if self._bufferstates:
1194 1207 self._bufferapplylabels = self._bufferstates[-1][2]
1195 1208 else:
1196 1209 self._bufferapplylabels = None
1197 1210
1198 1211 return b"".join(self._buffers.pop())
1199 1212
1200 def _isbuffered(self, dest):
1213 def _isbuffered(self, dest) -> bool:
1201 1214 if dest is self._fout:
1202 1215 return bool(self._buffers)
1203 1216 if dest is self._ferr:
1204 1217 return bool(self._bufferstates and self._bufferstates[-1][0])
1205 1218 return False
1206 1219
1207 def canwritewithoutlabels(self):
1220 def canwritewithoutlabels(self) -> bool:
1208 1221 '''check if write skips the label'''
1209 1222 if self._buffers and not self._bufferapplylabels:
1210 1223 return True
1211 1224 return self._colormode is None
1212 1225
1213 def canbatchlabeledwrites(self):
1226 def canbatchlabeledwrites(self) -> bool:
1214 1227 '''check if write calls with labels are batchable'''
1215 1228 # Windows color printing is special, see ``write``.
1216 1229 return self._colormode != b'win32'
1217 1230
1218 1231 def write(self, *args: bytes, **opts: _MsgOpts) -> None:
1219 1232 """write args to output
1220 1233
1221 1234 By default, this method simply writes to the buffer or stdout.
1222 1235 Color mode can be set on the UI class to have the output decorated
1223 1236 with color modifier before being written to stdout.
1224 1237
1225 1238 The color used is controlled by an optional keyword argument, "label".
1226 1239 This should be a string containing label names separated by space.
1227 1240 Label names take the form of "topic.type". For example, ui.debug()
1228 1241 issues a label of "ui.debug".
1229 1242
1230 1243 Progress reports via stderr are normally cleared before writing as
1231 1244 stdout and stderr go to the same terminal. This can be skipped with
1232 1245 the optional keyword argument "keepprogressbar". The progress bar
1233 1246 will continue to occupy a partial line on stderr in that case.
1234 1247 This functionality is intended when Mercurial acts as data source
1235 1248 in a pipe.
1236 1249
1237 1250 When labeling output for a specific command, a label of
1238 1251 "cmdname.type" is recommended. For example, status issues
1239 1252 a label of "status.modified" for modified files.
1240 1253 """
1241 1254 dest = self._fout
1242 1255
1243 1256 # inlined _write() for speed
1244 1257 if self._buffers:
1245 1258 label = opts.get('label', b'')
1246 1259 if label and self._bufferapplylabels:
1247 1260 self._buffers[-1].extend(self.label(a, label) for a in args)
1248 1261 else:
1249 1262 self._buffers[-1].extend(args)
1250 1263 return
1251 1264
1252 1265 # inlined _writenobuf() for speed
1253 1266 if not opts.get('keepprogressbar', False):
1254 1267 self._progclear()
1255 1268 msg = b''.join(args)
1256 1269
1257 1270 # opencode timeblockedsection because this is a critical path
1258 1271 starttime = util.timer()
1259 1272 try:
1260 1273 if self._colormode == b'win32':
1261 1274 # windows color printing is its own can of crab, defer to
1262 1275 # the color module and that is it.
1263 1276 color.win32print(self, dest.write, msg, **opts)
1264 1277 else:
1265 1278 if self._colormode is not None:
1266 1279 label = opts.get('label', b'')
1267 1280 msg = self.label(msg, label)
1268 1281 dest.write(msg)
1269 1282 except IOError as err:
1270 1283 raise error.StdioError(err)
1271 1284 finally:
1272 1285 self._blockedtimes[b'stdio_blocked'] += (
1273 1286 util.timer() - starttime
1274 1287 ) * 1000
1275 1288
1276 1289 def write_err(self, *args: bytes, **opts: _MsgOpts) -> None:
1277 1290 self._write(self._ferr, *args, **opts)
1278 1291
1279 1292 def _write(self, dest, *args: bytes, **opts: _MsgOpts) -> None:
1280 1293 # update write() as well if you touch this code
1281 1294 if self._isbuffered(dest):
1282 1295 label = opts.get('label', b'')
1283 1296 if label and self._bufferapplylabels:
1284 1297 self._buffers[-1].extend(self.label(a, label) for a in args)
1285 1298 else:
1286 1299 self._buffers[-1].extend(args)
1287 1300 else:
1288 1301 self._writenobuf(dest, *args, **opts)
1289 1302
1290 1303 def _writenobuf(self, dest, *args: bytes, **opts: _MsgOpts) -> None:
1291 1304 # update write() as well if you touch this code
1292 1305 if not opts.get('keepprogressbar', False):
1293 1306 self._progclear()
1294 1307 msg = b''.join(args)
1295 1308
1296 1309 # opencode timeblockedsection because this is a critical path
1297 1310 starttime = util.timer()
1298 1311 try:
1299 1312 if dest is self._ferr and not getattr(self._fout, 'closed', False):
1300 1313 self._fout.flush()
1301 1314 if getattr(dest, 'structured', False):
1302 1315 # channel for machine-readable output with metadata, where
1303 1316 # no extra colorization is necessary.
1304 1317 dest.write(msg, **opts)
1305 1318 elif self._colormode == b'win32':
1306 1319 # windows color printing is its own can of crab, defer to
1307 1320 # the color module and that is it.
1308 1321 color.win32print(self, dest.write, msg, **opts)
1309 1322 else:
1310 1323 if self._colormode is not None:
1311 1324 label = opts.get('label', b'')
1312 1325 msg = self.label(msg, label)
1313 1326 dest.write(msg)
1314 1327 # stderr may be buffered under win32 when redirected to files,
1315 1328 # including stdout.
1316 1329 if dest is self._ferr and not getattr(dest, 'closed', False):
1317 1330 dest.flush()
1318 1331 except IOError as err:
1319 1332 if dest is self._ferr and err.errno in (
1320 1333 errno.EPIPE,
1321 1334 errno.EIO,
1322 1335 errno.EBADF,
1323 1336 ):
1324 1337 # no way to report the error, so ignore it
1325 1338 return
1326 1339 raise error.StdioError(err)
1327 1340 finally:
1328 1341 self._blockedtimes[b'stdio_blocked'] += (
1329 1342 util.timer() - starttime
1330 1343 ) * 1000
1331 1344
1332 1345 def _writemsg(self, dest, *args: bytes, **opts: _MsgOpts) -> None:
1333 1346 timestamp = self.showtimestamp and opts.get('type') in {
1334 1347 b'debug',
1335 1348 b'error',
1336 1349 b'note',
1337 1350 b'status',
1338 1351 b'warning',
1339 1352 }
1340 1353 if timestamp:
1341 1354 args = (
1342 1355 b'[%s] '
1343 1356 % pycompat.bytestr(datetime.datetime.now().isoformat()),
1344 1357 ) + args
1345 1358 _writemsgwith(self._write, dest, *args, **opts)
1346 1359 if timestamp:
1347 1360 dest.flush()
1348 1361
1349 1362 def _writemsgnobuf(self, dest, *args: bytes, **opts: _MsgOpts) -> None:
1350 1363 _writemsgwith(self._writenobuf, dest, *args, **opts)
1351 1364
1352 1365 def flush(self) -> None:
1353 1366 # opencode timeblockedsection because this is a critical path
1354 1367 starttime = util.timer()
1355 1368 try:
1356 1369 try:
1357 1370 self._fout.flush()
1358 1371 except IOError as err:
1359 1372 if err.errno not in (errno.EPIPE, errno.EIO, errno.EBADF):
1360 1373 raise error.StdioError(err)
1361 1374 finally:
1362 1375 try:
1363 1376 self._ferr.flush()
1364 1377 except IOError as err:
1365 1378 if err.errno not in (errno.EPIPE, errno.EIO, errno.EBADF):
1366 1379 raise error.StdioError(err)
1367 1380 finally:
1368 1381 self._blockedtimes[b'stdio_blocked'] += (
1369 1382 util.timer() - starttime
1370 1383 ) * 1000
1371 1384
1372 def _isatty(self, fh):
1385 def _isatty(self, fh) -> bool:
1373 1386 if self.configbool(b'ui', b'nontty'):
1374 1387 return False
1375 1388 return procutil.isatty(fh)
1376 1389
1377 1390 def protectfinout(self):
1378 1391 """Duplicate ui streams and redirect original if they are stdio
1379 1392
1380 1393 Returns (fin, fout) which point to the original ui fds, but may be
1381 1394 copy of them. The returned streams can be considered "owned" in that
1382 1395 print(), exec(), etc. never reach to them.
1383 1396 """
1384 1397 if self._finoutredirected:
1385 1398 # if already redirected, protectstdio() would just create another
1386 1399 # nullfd pair, which is equivalent to returning self._fin/_fout.
1387 1400 return self._fin, self._fout
1388 1401 fin, fout = procutil.protectstdio(self._fin, self._fout)
1389 1402 self._finoutredirected = (fin, fout) != (self._fin, self._fout)
1390 1403 return fin, fout
1391 1404
1392 1405 def restorefinout(self, fin, fout):
1393 1406 """Restore ui streams from possibly duplicated (fin, fout)"""
1394 1407 if (fin, fout) == (self._fin, self._fout):
1395 1408 return
1396 1409 procutil.restorestdio(self._fin, self._fout, fin, fout)
1397 1410 # protectfinout() won't create more than one duplicated streams,
1398 1411 # so we can just turn the redirection flag off.
1399 1412 self._finoutredirected = False
1400 1413
1401 1414 @contextlib.contextmanager
1402 1415 def protectedfinout(self):
1403 1416 """Run code block with protected standard streams"""
1404 1417 fin, fout = self.protectfinout()
1405 1418 try:
1406 1419 yield fin, fout
1407 1420 finally:
1408 1421 self.restorefinout(fin, fout)
1409 1422
1410 def disablepager(self):
1423 def disablepager(self) -> None:
1411 1424 self._disablepager = True
1412 1425
1413 def pager(self, command):
1426 def pager(self, command: bytes) -> None:
1414 1427 """Start a pager for subsequent command output.
1415 1428
1416 1429 Commands which produce a long stream of output should call
1417 1430 this function to activate the user's preferred pagination
1418 1431 mechanism (which may be no pager). Calling this function
1419 1432 precludes any future use of interactive functionality, such as
1420 1433 prompting the user or activating curses.
1421 1434
1422 1435 Args:
1423 1436 command: The full, non-aliased name of the command. That is, "log"
1424 1437 not "history, "summary" not "summ", etc.
1425 1438 """
1426 1439 if self._disablepager or self.pageractive:
1427 1440 # how pager should do is already determined
1428 1441 return
1429 1442
1430 1443 if not command.startswith(b'internal-always-') and (
1431 1444 # explicit --pager=on (= 'internal-always-' prefix) should
1432 1445 # take precedence over disabling factors below
1433 1446 command in self.configlist(b'pager', b'ignore')
1434 1447 or not self.configbool(b'ui', b'paginate')
1435 1448 or not self.configbool(b'pager', b'attend-' + command, True)
1436 1449 or encoding.environ.get(b'TERM') == b'dumb'
1437 1450 # TODO: if we want to allow HGPLAINEXCEPT=pager,
1438 1451 # formatted() will need some adjustment.
1439 1452 or not self.formatted()
1440 1453 or self.plain()
1441 1454 or self._buffers
1442 1455 # TODO: expose debugger-enabled on the UI object
1443 1456 or b'--debugger' in pycompat.sysargv
1444 1457 ):
1445 1458 # We only want to paginate if the ui appears to be
1446 1459 # interactive, the user didn't say HGPLAIN or
1447 1460 # HGPLAINEXCEPT=pager, and the user didn't specify --debug.
1448 1461 return
1449 1462
1450 1463 # py2exe doesn't appear to be able to use legacy I/O, and nothing is
1451 1464 # output to the pager for paged commands. Piping to `more` in cmd.exe
1452 1465 # works, but is easy to forget. Just disable pager for py2exe, but
1453 1466 # leave it working for pyoxidizer and exewrapper builds.
1454 1467 if pycompat.iswindows and getattr(sys, "frozen", None) == "console_exe":
1455 1468 self.debug(b"pager is unavailable with py2exe packaging\n")
1456 1469 return
1457 1470
1458 1471 pagercmd = self.config(b'pager', b'pager', rcutil.fallbackpager)
1459 1472 if not pagercmd:
1460 1473 return
1461 1474
1462 1475 pagerenv = {}
1463 1476 for name, value in rcutil.defaultpagerenv().items():
1464 1477 if name not in encoding.environ:
1465 1478 pagerenv[name] = value
1466 1479
1467 1480 self.debug(
1468 1481 b'starting pager for command %s\n' % stringutil.pprint(command)
1469 1482 )
1470 1483 self.flush()
1471 1484
1472 1485 wasformatted = self.formatted()
1473 1486 if util.safehasattr(signal, b"SIGPIPE"):
1474 1487 signal.signal(signal.SIGPIPE, _catchterm)
1475 1488 if self._runpager(pagercmd, pagerenv):
1476 1489 self.pageractive = True
1477 1490 # Preserve the formatted-ness of the UI. This is important
1478 1491 # because we mess with stdout, which might confuse
1479 1492 # auto-detection of things being formatted.
1480 1493 self.setconfig(b'ui', b'formatted', wasformatted, b'pager')
1481 1494 self.setconfig(b'ui', b'interactive', False, b'pager')
1482 1495
1483 1496 # If pagermode differs from color.mode, reconfigure color now that
1484 1497 # pageractive is set.
1485 1498 cm = self._colormode
1486 1499 if cm != self.config(b'color', b'pagermode', cm):
1487 1500 color.setup(self)
1488 1501 else:
1489 1502 # If the pager can't be spawned in dispatch when --pager=on is
1490 1503 # given, don't try again when the command runs, to avoid a duplicate
1491 1504 # warning about a missing pager command.
1492 1505 self.disablepager()
1493 1506
1494 def _runpager(self, command, env=None):
1507 def _runpager(self, command: bytes, env=None) -> bool:
1495 1508 """Actually start the pager and set up file descriptors.
1496 1509
1497 1510 This is separate in part so that extensions (like chg) can
1498 1511 override how a pager is invoked.
1499 1512 """
1500 1513 if command == b'cat':
1501 1514 # Save ourselves some work.
1502 1515 return False
1503 1516 # If the command doesn't contain any of these characters, we
1504 1517 # assume it's a binary and exec it directly. This means for
1505 1518 # simple pager command configurations, we can degrade
1506 1519 # gracefully and tell the user about their broken pager.
1507 1520 shell = any(c in command for c in b"|&;<>()$`\\\"' \t\n*?[#~=%")
1508 1521
1509 1522 if pycompat.iswindows and not shell:
1510 1523 # Window's built-in `more` cannot be invoked with shell=False, but
1511 1524 # its `more.com` can. Hide this implementation detail from the
1512 1525 # user so we can also get sane bad PAGER behavior. MSYS has
1513 1526 # `more.exe`, so do a cmd.exe style resolution of the executable to
1514 1527 # determine which one to use.
1515 1528 fullcmd = procutil.findexe(command)
1516 1529 if not fullcmd:
1517 1530 self.warn(
1518 1531 _(b"missing pager command '%s', skipping pager\n") % command
1519 1532 )
1520 1533 return False
1521 1534
1522 1535 command = fullcmd
1523 1536
1524 1537 try:
1525 1538 pager = subprocess.Popen(
1526 1539 procutil.tonativestr(command),
1527 1540 shell=shell,
1528 1541 bufsize=-1,
1529 1542 close_fds=procutil.closefds,
1530 1543 stdin=subprocess.PIPE,
1531 1544 stdout=procutil.stdout,
1532 1545 stderr=procutil.stderr,
1533 1546 env=procutil.tonativeenv(procutil.shellenviron(env)),
1534 1547 )
1535 1548 except FileNotFoundError:
1536 1549 if not shell:
1537 1550 self.warn(
1538 1551 _(b"missing pager command '%s', skipping pager\n") % command
1539 1552 )
1540 1553 return False
1541 1554 raise
1542 1555
1543 1556 # back up original file descriptors
1544 1557 stdoutfd = os.dup(procutil.stdout.fileno())
1545 1558 stderrfd = os.dup(procutil.stderr.fileno())
1546 1559
1547 1560 os.dup2(pager.stdin.fileno(), procutil.stdout.fileno())
1548 1561 if self._isatty(procutil.stderr):
1549 1562 os.dup2(pager.stdin.fileno(), procutil.stderr.fileno())
1550 1563
1551 1564 @self.atexit
1552 1565 def killpager():
1553 1566 if util.safehasattr(signal, b"SIGINT"):
1554 1567 signal.signal(signal.SIGINT, signal.SIG_IGN)
1555 1568 # restore original fds, closing pager.stdin copies in the process
1556 1569 os.dup2(stdoutfd, procutil.stdout.fileno())
1557 1570 os.dup2(stderrfd, procutil.stderr.fileno())
1558 1571 pager.stdin.close()
1559 1572 pager.wait()
1560 1573
1561 1574 return True
1562 1575
1563 1576 @property
1564 1577 def _exithandlers(self):
1565 1578 return _reqexithandlers
1566 1579
1567 1580 def atexit(self, func, *args, **kwargs):
1568 1581 """register a function to run after dispatching a request
1569 1582
1570 1583 Handlers do not stay registered across request boundaries."""
1571 1584 self._exithandlers.append((func, args, kwargs))
1572 1585 return func
1573 1586
1574 def interface(self, feature):
1587 def interface(self, feature: bytes) -> bytes:
1575 1588 """what interface to use for interactive console features?
1576 1589
1577 1590 The interface is controlled by the value of `ui.interface` but also by
1578 1591 the value of feature-specific configuration. For example:
1579 1592
1580 1593 ui.interface.histedit = text
1581 1594 ui.interface.chunkselector = curses
1582 1595
1583 1596 Here the features are "histedit" and "chunkselector".
1584 1597
1585 1598 The configuration above means that the default interfaces for commands
1586 1599 is curses, the interface for histedit is text and the interface for
1587 1600 selecting chunk is crecord (the best curses interface available).
1588 1601
1589 1602 Consider the following example:
1590 1603 ui.interface = curses
1591 1604 ui.interface.histedit = text
1592 1605
1593 1606 Then histedit will use the text interface and chunkselector will use
1594 1607 the default curses interface (crecord at the moment).
1595 1608 """
1596 1609 alldefaults = frozenset([b"text", b"curses"])
1597 1610
1598 1611 featureinterfaces = {
1599 1612 b"chunkselector": [
1600 1613 b"text",
1601 1614 b"curses",
1602 1615 ],
1603 1616 b"histedit": [
1604 1617 b"text",
1605 1618 b"curses",
1606 1619 ],
1607 1620 }
1608 1621
1609 1622 # Feature-specific interface
1610 1623 if feature not in featureinterfaces.keys():
1611 1624 # Programming error, not user error
1612 1625 raise ValueError(b"Unknown feature requested %s" % feature)
1613 1626
1614 1627 availableinterfaces = frozenset(featureinterfaces[feature])
1615 1628 if alldefaults > availableinterfaces:
1616 1629 # Programming error, not user error. We need a use case to
1617 1630 # define the right thing to do here.
1618 1631 raise ValueError(
1619 1632 b"Feature %s does not handle all default interfaces" % feature
1620 1633 )
1621 1634
1622 1635 if self.plain() or encoding.environ.get(b'TERM') == b'dumb':
1623 1636 return b"text"
1624 1637
1625 1638 # Default interface for all the features
1626 1639 defaultinterface = b"text"
1627 1640 i = self.config(b"ui", b"interface")
1628 1641 if i in alldefaults:
1629 defaultinterface = i
1642 defaultinterface = cast(bytes, i) # cast to help pytype
1630 1643
1631 choseninterface = defaultinterface
1644 choseninterface: bytes = defaultinterface
1632 1645 f = self.config(b"ui", b"interface.%s" % feature)
1633 1646 if f in availableinterfaces:
1634 choseninterface = f
1647 choseninterface = cast(bytes, f) # cast to help pytype
1635 1648
1636 1649 if i is not None and defaultinterface != i:
1637 1650 if f is not None:
1638 1651 self.warn(_(b"invalid value for ui.interface: %s\n") % (i,))
1639 1652 else:
1640 1653 self.warn(
1641 1654 _(b"invalid value for ui.interface: %s (using %s)\n")
1642 1655 % (i, choseninterface)
1643 1656 )
1644 1657 if f is not None and choseninterface != f:
1645 1658 self.warn(
1646 1659 _(b"invalid value for ui.interface.%s: %s (using %s)\n")
1647 1660 % (feature, f, choseninterface)
1648 1661 )
1649 1662
1650 1663 return choseninterface
1651 1664
1652 1665 def interactive(self):
1653 1666 """is interactive input allowed?
1654 1667
1655 1668 An interactive session is a session where input can be reasonably read
1656 1669 from `sys.stdin'. If this function returns false, any attempt to read
1657 1670 from stdin should fail with an error, unless a sensible default has been
1658 1671 specified.
1659 1672
1660 1673 Interactiveness is triggered by the value of the `ui.interactive'
1661 1674 configuration variable or - if it is unset - when `sys.stdin' points
1662 1675 to a terminal device.
1663 1676
1664 1677 This function refers to input only; for output, see `ui.formatted()'.
1665 1678 """
1666 1679 i = self.configbool(b"ui", b"interactive")
1667 1680 if i is None:
1668 1681 # some environments replace stdin without implementing isatty
1669 1682 # usually those are non-interactive
1670 1683 return self._isatty(self._fin)
1671 1684
1672 1685 return i
1673 1686
1674 def termwidth(self):
1687 def termwidth(self) -> int:
1675 1688 """how wide is the terminal in columns?"""
1676 1689 if b'COLUMNS' in encoding.environ:
1677 1690 try:
1678 1691 return int(encoding.environ[b'COLUMNS'])
1679 1692 except ValueError:
1680 1693 pass
1681 1694 return scmutil.termsize(self)[0]
1682 1695
1683 1696 def formatted(self):
1684 1697 """should formatted output be used?
1685 1698
1686 1699 It is often desirable to format the output to suite the output medium.
1687 1700 Examples of this are truncating long lines or colorizing messages.
1688 1701 However, this is not often not desirable when piping output into other
1689 1702 utilities, e.g. `grep'.
1690 1703
1691 1704 Formatted output is triggered by the value of the `ui.formatted'
1692 1705 configuration variable or - if it is unset - when `sys.stdout' points
1693 1706 to a terminal device. Please note that `ui.formatted' should be
1694 1707 considered an implementation detail; it is not intended for use outside
1695 1708 Mercurial or its extensions.
1696 1709
1697 1710 This function refers to output only; for input, see `ui.interactive()'.
1698 1711 This function always returns false when in plain mode, see `ui.plain()'.
1699 1712 """
1700 1713 if self.plain():
1701 1714 return False
1702 1715
1703 1716 i = self.configbool(b"ui", b"formatted")
1704 1717 if i is None:
1705 1718 # some environments replace stdout without implementing isatty
1706 1719 # usually those are non-interactive
1707 1720 return self._isatty(self._fout)
1708 1721
1709 1722 return i
1710 1723
1711 1724 def _readline(
1712 1725 self,
1713 1726 prompt: bytes = b' ',
1714 1727 promptopts: Optional[Dict[str, _MsgOpts]] = None,
1715 1728 ) -> bytes:
1716 1729 # Replacing stdin/stdout temporarily is a hard problem on Python 3
1717 1730 # because they have to be text streams with *no buffering*. Instead,
1718 1731 # we use rawinput() only if call_readline() will be invoked by
1719 1732 # PyOS_Readline(), so no I/O will be made at Python layer.
1720 1733 usereadline = (
1721 1734 self._isatty(self._fin)
1722 1735 and self._isatty(self._fout)
1723 1736 and procutil.isstdin(self._fin)
1724 1737 and procutil.isstdout(self._fout)
1725 1738 )
1726 1739 if usereadline:
1727 1740 try:
1728 1741 # magically add command line editing support, where
1729 1742 # available
1730 1743 import readline
1731 1744
1732 1745 # force demandimport to really load the module
1733 1746 readline.read_history_file
1734 1747 # windows sometimes raises something other than ImportError
1735 1748 except Exception:
1736 1749 usereadline = False
1737 1750
1738 1751 if self._colormode == b'win32' or not usereadline:
1739 1752 if not promptopts:
1740 1753 promptopts = {}
1741 1754 self._writemsgnobuf(
1742 1755 self._fmsgout, prompt, type=b'prompt', **promptopts
1743 1756 )
1744 1757 self.flush()
1745 1758 prompt = b' '
1746 1759 else:
1747 1760 prompt = self.label(prompt, b'ui.prompt') + b' '
1748 1761
1749 1762 # prompt ' ' must exist; otherwise readline may delete entire line
1750 1763 # - http://bugs.python.org/issue12833
1751 1764 with self.timeblockedsection(b'stdio'):
1752 1765 if usereadline:
1753 1766 self.flush()
1754 1767 prompt = encoding.strfromlocal(prompt)
1755 1768 line = encoding.strtolocal(input(prompt))
1756 1769 # When stdin is in binary mode on Windows, it can cause
1757 1770 # input() to emit an extra trailing carriage return
1758 1771 if pycompat.oslinesep == b'\r\n' and line.endswith(b'\r'):
1759 1772 line = line[:-1]
1760 1773 else:
1761 1774 self._fout.write(pycompat.bytestr(prompt))
1762 1775 self._fout.flush()
1763 1776 line = self._fin.readline()
1764 1777 if not line:
1765 1778 raise EOFError
1766 1779 line = line.rstrip(pycompat.oslinesep)
1767 1780
1768 1781 return line
1769 1782
1770 1783 def prompt(self, msg, default=b"y"):
1771 1784 """Prompt user with msg, read response.
1772 1785 If ui is not interactive, the default is returned.
1773 1786 """
1774 1787 return self._prompt(msg, default=default)
1775 1788
1776 1789 def _prompt(self, msg, **opts):
1777 1790 default = opts['default']
1778 1791 if not self.interactive():
1779 1792 self._writemsg(self._fmsgout, msg, b' ', type=b'prompt', **opts)
1780 1793 self._writemsg(
1781 1794 self._fmsgout, default or b'', b"\n", type=b'promptecho'
1782 1795 )
1783 1796 return default
1784 1797 try:
1785 1798 r = self._readline(prompt=msg, promptopts=opts)
1786 1799 if not r:
1787 1800 r = default
1788 1801 if self.configbool(b'ui', b'promptecho'):
1789 1802 self._writemsg(
1790 1803 self._fmsgout, r or b'', b"\n", type=b'promptecho'
1791 1804 )
1792 1805 return r
1793 1806 except EOFError:
1794 1807 raise error.ResponseExpected()
1795 1808
1796 1809 @staticmethod
1797 1810 def extractchoices(prompt: bytes) -> Tuple[bytes, List[_PromptChoice]]:
1798 1811 """Extract prompt message and list of choices from specified prompt.
1799 1812
1800 1813 This returns tuple "(message, choices)", and "choices" is the
1801 1814 list of tuple "(response character, text without &)".
1802 1815
1803 1816 >>> ui.extractchoices(b"awake? $$ &Yes $$ &No")
1804 1817 ('awake? ', [('y', 'Yes'), ('n', 'No')])
1805 1818 >>> ui.extractchoices(b"line\\nbreak? $$ &Yes $$ &No")
1806 1819 ('line\\nbreak? ', [('y', 'Yes'), ('n', 'No')])
1807 1820 >>> ui.extractchoices(b"want lots of $$money$$?$$Ye&s$$N&o")
1808 1821 ('want lots of $$money$$?', [('s', 'Yes'), ('o', 'No')])
1809 1822 """
1810 1823
1811 1824 # Sadly, the prompt string may have been built with a filename
1812 1825 # containing "$$" so let's try to find the first valid-looking
1813 1826 # prompt to start parsing. Sadly, we also can't rely on
1814 1827 # choices containing spaces, ASCII, or basically anything
1815 1828 # except an ampersand followed by a character.
1816 1829 m = re.match(br'(?s)(.+?)\$\$([^$]*&[^ $].*)', prompt)
1817 1830
1818 1831 assert m is not None # help pytype
1819 1832
1820 1833 msg = m.group(1)
1821 1834 choices = [p.strip(b' ') for p in m.group(2).split(b'$$')]
1822 1835
1823 1836 def choicetuple(s):
1824 1837 ampidx = s.index(b'&')
1825 1838 return s[ampidx + 1 : ampidx + 2].lower(), s.replace(b'&', b'', 1)
1826 1839
1827 1840 return (msg, [choicetuple(s) for s in choices])
1828 1841
1829 1842 def promptchoice(self, prompt: bytes, default: int = 0) -> int:
1830 1843 """Prompt user with a message, read response, and ensure it matches
1831 1844 one of the provided choices. The prompt is formatted as follows:
1832 1845
1833 1846 "would you like fries with that (Yn)? $$ &Yes $$ &No"
1834 1847
1835 1848 The index of the choice is returned. Responses are case
1836 1849 insensitive. If ui is not interactive, the default is
1837 1850 returned.
1838 1851 """
1839 1852
1840 1853 msg, choices = self.extractchoices(prompt)
1841 1854 resps = [r for r, t in choices]
1842 1855 while True:
1843 1856 r = self._prompt(msg, default=resps[default], choices=choices)
1844 1857 if r.lower() in resps:
1845 1858 return resps.index(r.lower())
1846 1859 # TODO: shouldn't it be a warning?
1847 1860 self._writemsg(self._fmsgout, _(b"unrecognized response\n"))
1848 1861
1849 1862 def getpass(
1850 1863 self, prompt: Optional[bytes] = None, default: Optional[bytes] = None
1851 1864 ) -> Optional[bytes]:
1852 1865 if not self.interactive():
1853 1866 return default
1854 1867 try:
1855 1868 self._writemsg(
1856 1869 self._fmsgerr,
1857 1870 prompt or _(b'password: '),
1858 1871 type=b'prompt',
1859 1872 password=True,
1860 1873 )
1861 1874 # disable getpass() only if explicitly specified. it's still valid
1862 1875 # to interact with tty even if fin is not a tty.
1863 1876 with self.timeblockedsection(b'stdio'):
1864 1877 if self.configbool(b'ui', b'nontty'):
1865 1878 l = self._fin.readline()
1866 1879 if not l:
1867 1880 raise EOFError
1868 1881 return l.rstrip(b'\n')
1869 1882 else:
1870 1883 return util.get_password()
1871 1884 except EOFError:
1872 1885 raise error.ResponseExpected()
1873 1886
1874 1887 def status(self, *msg: bytes, **opts: _MsgOpts) -> None:
1875 1888 """write status message to output (if ui.quiet is False)
1876 1889
1877 1890 This adds an output label of "ui.status".
1878 1891 """
1879 1892 if not self.quiet:
1880 1893 self._writemsg(self._fmsgout, type=b'status', *msg, **opts)
1881 1894
1882 1895 def warn(self, *msg: bytes, **opts: _MsgOpts) -> None:
1883 1896 """write warning message to output (stderr)
1884 1897
1885 1898 This adds an output label of "ui.warning".
1886 1899 """
1887 1900 self._writemsg(self._fmsgerr, type=b'warning', *msg, **opts)
1888 1901
1889 1902 def error(self, *msg: bytes, **opts: _MsgOpts) -> None:
1890 1903 """write error message to output (stderr)
1891 1904
1892 1905 This adds an output label of "ui.error".
1893 1906 """
1894 1907 self._writemsg(self._fmsgerr, type=b'error', *msg, **opts)
1895 1908
1896 1909 def note(self, *msg: bytes, **opts: _MsgOpts) -> None:
1897 1910 """write note to output (if ui.verbose is True)
1898 1911
1899 1912 This adds an output label of "ui.note".
1900 1913 """
1901 1914 if self.verbose:
1902 1915 self._writemsg(self._fmsgout, type=b'note', *msg, **opts)
1903 1916
1904 1917 def debug(self, *msg: bytes, **opts: _MsgOpts) -> None:
1905 1918 """write debug message to output (if ui.debugflag is True)
1906 1919
1907 1920 This adds an output label of "ui.debug".
1908 1921 """
1909 1922 if self.debugflag:
1910 1923 self._writemsg(self._fmsgout, type=b'debug', *msg, **opts)
1911 1924 self.log(b'debug', b'%s', b''.join(msg))
1912 1925
1913 1926 # Aliases to defeat check-code.
1914 1927 statusnoi18n = status
1915 1928 notenoi18n = note
1916 1929 warnnoi18n = warn
1917 1930 writenoi18n = write
1918 1931
1919 1932 def edit(
1920 1933 self,
1921 text,
1922 user,
1923 extra=None,
1934 text: bytes,
1935 user: bytes,
1936 extra: Optional[Dict[bytes, Any]] = None, # TODO: value type of bytes?
1924 1937 editform=None,
1925 1938 pending=None,
1926 repopath=None,
1927 action=None,
1928 ):
1939 repopath: Optional[bytes] = None,
1940 action: Optional[bytes] = None,
1941 ) -> bytes:
1929 1942 if action is None:
1930 1943 self.develwarn(
1931 1944 b'action is None but will soon be a required '
1932 1945 b'parameter to ui.edit()'
1933 1946 )
1934 1947 extra_defaults = {
1935 1948 b'prefix': b'editor',
1936 1949 b'suffix': b'.txt',
1937 1950 }
1938 1951 if extra is not None:
1939 1952 if extra.get(b'suffix') is not None:
1940 1953 self.develwarn(
1941 1954 b'extra.suffix is not None but will soon be '
1942 1955 b'ignored by ui.edit()'
1943 1956 )
1944 1957 extra_defaults.update(extra)
1945 1958 extra = extra_defaults
1946 1959
1947 1960 if action == b'diff':
1948 1961 suffix = b'.diff'
1949 1962 elif action:
1950 1963 suffix = b'.%s.hg.txt' % action
1951 1964 else:
1952 1965 suffix = extra[b'suffix']
1953 1966
1954 1967 rdir = None
1955 1968 if self.configbool(b'experimental', b'editortmpinhg'):
1956 1969 rdir = repopath
1957 1970 (fd, name) = pycompat.mkstemp(
1958 1971 prefix=b'hg-' + extra[b'prefix'] + b'-', suffix=suffix, dir=rdir
1959 1972 )
1960 1973 try:
1961 1974 with os.fdopen(fd, 'wb') as f:
1962 1975 f.write(util.tonativeeol(text))
1963 1976
1964 1977 environ = {b'HGUSER': user}
1965 1978 if b'transplant_source' in extra:
1966 1979 environ.update(
1967 1980 {b'HGREVISION': hex(extra[b'transplant_source'])}
1968 1981 )
1969 1982 for label in (b'intermediate-source', b'source', b'rebase_source'):
1970 1983 if label in extra:
1971 1984 environ.update({b'HGREVISION': extra[label]})
1972 1985 break
1973 1986 if editform:
1974 1987 environ.update({b'HGEDITFORM': editform})
1975 1988 if pending:
1976 1989 environ.update({b'HG_PENDING': pending})
1977 1990
1978 1991 editor = self.geteditor()
1979 1992
1980 1993 self.system(
1981 1994 b"%s \"%s\"" % (editor, name),
1982 1995 environ=environ,
1983 1996 onerr=error.CanceledError,
1984 1997 errprefix=_(b"edit failed"),
1985 1998 blockedtag=b'editor',
1986 1999 )
1987 2000
1988 2001 with open(name, 'rb') as f:
1989 2002 t = util.fromnativeeol(f.read())
1990 2003 finally:
1991 2004 os.unlink(name)
1992 2005
1993 2006 return t
1994 2007
1995 2008 def system(
1996 2009 self,
1997 cmd,
2010 cmd: bytes,
1998 2011 environ=None,
1999 cwd=None,
2000 onerr=None,
2001 errprefix=None,
2002 blockedtag=None,
2003 ):
2012 cwd: Optional[bytes] = None,
2013 onerr: Optional[Callable[[bytes], Exception]] = None,
2014 errprefix: Optional[bytes] = None,
2015 blockedtag: Optional[bytes] = None,
2016 ) -> int:
2004 2017 """execute shell command with appropriate output stream. command
2005 2018 output will be redirected if fout is not stdout.
2006 2019
2007 2020 if command fails and onerr is None, return status, else raise onerr
2008 2021 object as exception.
2009 2022 """
2010 2023 if blockedtag is None:
2011 2024 # Long cmds tend to be because of an absolute path on cmd. Keep
2012 2025 # the tail end instead
2013 2026 cmdsuffix = cmd.translate(None, _keepalnum)[-85:]
2014 2027 blockedtag = b'unknown_system_' + cmdsuffix
2015 2028 out = self._fout
2016 2029 if any(s[1] for s in self._bufferstates):
2017 2030 out = self
2018 2031 with self.timeblockedsection(blockedtag):
2019 2032 rc = self._runsystem(cmd, environ=environ, cwd=cwd, out=out)
2020 2033 if rc and onerr:
2021 2034 errmsg = b'%s %s' % (
2022 2035 procutil.shellsplit(cmd)[0],
2023 2036 procutil.explainexit(rc),
2024 2037 )
2025 2038 if errprefix:
2026 2039 errmsg = b'%s: %s' % (errprefix, errmsg)
2027 2040 raise onerr(errmsg)
2028 2041 return rc
2029 2042
2030 def _runsystem(self, cmd, environ, cwd, out):
2043 def _runsystem(self, cmd: bytes, environ, cwd: Optional[bytes], out) -> int:
2031 2044 """actually execute the given shell command (can be overridden by
2032 2045 extensions like chg)"""
2033 2046 return procutil.system(cmd, environ=environ, cwd=cwd, out=out)
2034 2047
2035 def traceback(self, exc=None, force=False):
2048 def traceback(self, exc=None, force: bool = False):
2036 2049 """print exception traceback if traceback printing enabled or forced.
2037 2050 only to call in exception handler. returns true if traceback
2038 2051 printed."""
2039 2052 if self.tracebackflag or force:
2040 2053 if exc is None:
2041 2054 exc = sys.exc_info()
2042 2055 cause = getattr(exc[1], 'cause', None)
2043 2056
2044 2057 if cause is not None:
2045 2058 causetb = traceback.format_tb(cause[2])
2046 2059 exctb = traceback.format_tb(exc[2])
2047 2060 exconly = traceback.format_exception_only(cause[0], cause[1])
2048 2061
2049 2062 # exclude frame where 'exc' was chained and rethrown from exctb
2050 2063 self.write_err(
2051 2064 b'Traceback (most recent call last):\n',
2052 2065 encoding.strtolocal(''.join(exctb[:-1])),
2053 2066 encoding.strtolocal(''.join(causetb)),
2054 2067 encoding.strtolocal(''.join(exconly)),
2055 2068 )
2056 2069 else:
2057 2070 output = traceback.format_exception(exc[0], exc[1], exc[2])
2058 2071 self.write_err(encoding.strtolocal(''.join(output)))
2059 2072 return self.tracebackflag or force
2060 2073
2061 2074 def geteditor(self):
2062 2075 '''return editor to use'''
2063 2076 if pycompat.sysplatform == b'plan9':
2064 2077 # vi is the MIPS instruction simulator on Plan 9. We
2065 2078 # instead default to E to plumb commit messages to
2066 2079 # avoid confusion.
2067 2080 editor = b'E'
2068 2081 elif pycompat.isdarwin:
2069 2082 # vi on darwin is POSIX compatible to a fault, and that includes
2070 2083 # exiting non-zero if you make any mistake when running an ex
2071 2084 # command. Proof: `vi -c ':unknown' -c ':qa'; echo $?` produces 1,
2072 2085 # while s/vi/vim/ doesn't.
2073 2086 editor = b'vim'
2074 2087 else:
2075 2088 editor = b'vi'
2076 2089 return encoding.environ.get(b"HGEDITOR") or self.config(
2077 2090 b"ui", b"editor", editor
2078 2091 )
2079 2092
2080 2093 @util.propertycache
2081 2094 def _progbar(self) -> Optional[progress.progbar]:
2082 2095 """setup the progbar singleton to the ui object"""
2083 2096 if (
2084 2097 self.quiet
2085 2098 or self.debugflag
2086 2099 or self.configbool(b'progress', b'disable')
2087 2100 or not progress.shouldprint(self)
2088 2101 ):
2089 2102 return None
2090 2103 return getprogbar(self)
2091 2104
2092 2105 def _progclear(self) -> None:
2093 2106 """clear progress bar output if any. use it before any output"""
2094 2107 if not haveprogbar(): # nothing loaded yet
2095 2108 return
2096 2109 if self._progbar is not None and self._progbar.printed:
2097 2110 self._progbar.clear()
2098 2111
2099 2112 def makeprogress(
2100 2113 self, topic: bytes, unit: bytes = b"", total: Optional[int] = None
2101 2114 ) -> scmutil.progress:
2102 2115 """Create a progress helper for the specified topic"""
2103 2116 if getattr(self._fmsgerr, 'structured', False):
2104 2117 # channel for machine-readable output with metadata, just send
2105 2118 # raw information
2106 2119 # TODO: consider porting some useful information (e.g. estimated
2107 2120 # time) from progbar. we might want to support update delay to
2108 2121 # reduce the cost of transferring progress messages.
2109 2122 def updatebar(topic, pos, item, unit, total):
2110 2123 self._fmsgerr.write(
2111 2124 None,
2112 2125 type=b'progress',
2113 2126 topic=topic,
2114 2127 pos=pos,
2115 2128 item=item,
2116 2129 unit=unit,
2117 2130 total=total,
2118 2131 )
2119 2132
2120 2133 elif self._progbar is not None:
2121 2134 updatebar = self._progbar.progress
2122 2135 else:
2123 2136
2124 2137 def updatebar(topic, pos, item, unit, total):
2125 2138 pass
2126 2139
2127 2140 return scmutil.progress(self, updatebar, topic, unit, total)
2128 2141
2129 2142 def getlogger(self, name):
2130 2143 """Returns a logger of the given name; or None if not registered"""
2131 2144 return self._loggers.get(name)
2132 2145
2133 def setlogger(self, name, logger):
2146 def setlogger(self, name, logger) -> None:
2134 2147 """Install logger which can be identified later by the given name
2135 2148
2136 2149 More than one loggers can be registered. Use extension or module
2137 2150 name to uniquely identify the logger instance.
2138 2151 """
2139 2152 self._loggers[name] = logger
2140 2153
2141 def log(self, event, msgfmt, *msgargs, **opts):
2154 def log(self, event, msgfmt, *msgargs, **opts) -> None:
2142 2155 """hook for logging facility extensions
2143 2156
2144 2157 event should be a readily-identifiable subsystem, which will
2145 2158 allow filtering.
2146 2159
2147 2160 msgfmt should be a newline-terminated format string to log, and
2148 2161 *msgargs are %-formatted into it.
2149 2162
2150 2163 **opts currently has no defined meanings.
2151 2164 """
2152 2165 if not self._loggers:
2153 2166 return
2154 2167 activeloggers = [l for l in self._loggers.values() if l.tracked(event)]
2155 2168 if not activeloggers:
2156 2169 return
2157 2170 msg = msgfmt % msgargs
2158 2171 opts = pycompat.byteskwargs(opts)
2159 2172 # guard against recursion from e.g. ui.debug()
2160 2173 registeredloggers = self._loggers
2161 2174 self._loggers = {}
2162 2175 try:
2163 2176 for logger in activeloggers:
2164 2177 logger.log(self, event, msg, opts)
2165 2178 finally:
2166 2179 self._loggers = registeredloggers
2167 2180
2168 2181 def label(self, msg: bytes, label: bytes) -> bytes:
2169 2182 """style msg based on supplied label
2170 2183
2171 2184 If some color mode is enabled, this will add the necessary control
2172 2185 characters to apply such color. In addition, 'debug' color mode adds
2173 2186 markup showing which label affects a piece of text.
2174 2187
2175 2188 ui.write(s, 'label') is equivalent to
2176 2189 ui.write(ui.label(s, 'label')).
2177 2190 """
2178 2191 if self._colormode is not None:
2179 2192 return color.colorlabel(self, msg, label)
2180 2193 return msg
2181 2194
2182 2195 def develwarn(
2183 2196 self, msg: bytes, stacklevel: int = 1, config: Optional[bytes] = None
2184 2197 ) -> None:
2185 2198 """issue a developer warning message
2186 2199
2187 2200 Use 'stacklevel' to report the offender some layers further up in the
2188 2201 stack.
2189 2202 """
2190 2203 if not self.configbool(b'devel', b'all-warnings'):
2191 2204 if config is None or not self.configbool(b'devel', config):
2192 2205 return
2193 2206 msg = b'devel-warn: ' + msg
2194 2207 stacklevel += 1 # get in develwarn
2195 2208 if self.tracebackflag:
2196 2209 util.debugstacktrace(msg, stacklevel, self._ferr, self._fout)
2197 2210 self.log(
2198 2211 b'develwarn',
2199 2212 b'%s at:\n%s'
2200 2213 % (msg, b''.join(util.getstackframes(stacklevel))),
2201 2214 )
2202 2215 else:
2203 2216 curframe = inspect.currentframe()
2204 2217 calframe = inspect.getouterframes(curframe, 2)
2205 2218 fname, lineno, fmsg = calframe[stacklevel][1:4]
2206 2219 fname, fmsg = pycompat.sysbytes(fname), pycompat.sysbytes(fmsg)
2207 2220 self.write_err(b'%s at: %s:%d (%s)\n' % (msg, fname, lineno, fmsg))
2208 2221 self.log(
2209 2222 b'develwarn', b'%s at: %s:%d (%s)\n', msg, fname, lineno, fmsg
2210 2223 )
2211 2224
2212 2225 # avoid cycles
2213 2226 del curframe
2214 2227 del calframe
2215 2228
2216 2229 def deprecwarn(
2217 2230 self, msg: bytes, version: bytes, stacklevel: int = 2
2218 2231 ) -> None:
2219 2232 """issue a deprecation warning
2220 2233
2221 2234 - msg: message explaining what is deprecated and how to upgrade,
2222 2235 - version: last version where the API will be supported,
2223 2236 """
2224 2237 if not (
2225 2238 self.configbool(b'devel', b'all-warnings')
2226 2239 or self.configbool(b'devel', b'deprec-warn')
2227 2240 ):
2228 2241 return
2229 2242 msg += (
2230 2243 b"\n(compatibility will be dropped after Mercurial-%s,"
2231 2244 b" update your code.)"
2232 2245 ) % version
2233 2246 self.develwarn(msg, stacklevel=stacklevel, config=b'deprec-warn')
2234 2247
2235 2248 def exportableenviron(self):
2236 2249 """The environment variables that are safe to export, e.g. through
2237 2250 hgweb.
2238 2251 """
2239 2252 return self._exportableenviron
2240 2253
2241 2254 @contextlib.contextmanager
2242 def configoverride(self, overrides, source=b""):
2255 def configoverride(self, overrides: _ConfigItems, source: bytes = b""):
2243 2256 """Context manager for temporary config overrides
2244 2257 `overrides` must be a dict of the following structure:
2245 2258 {(section, name) : value}"""
2246 2259 backups = {}
2247 2260 try:
2248 2261 for (section, name), value in overrides.items():
2249 2262 backups[(section, name)] = self.backupconfig(section, name)
2250 2263 self.setconfig(section, name, value, source)
2251 2264 yield
2252 2265 finally:
2253 2266 for __, backup in backups.items():
2254 2267 self.restoreconfig(backup)
2255 2268 # just restoring ui.quiet config to the previous value is not enough
2256 2269 # as it does not update ui.quiet class member
2257 2270 if (b'ui', b'quiet') in overrides:
2258 2271 self.fixconfig(section=b'ui')
2259 2272
2260 def estimatememory(self):
2273 def estimatememory(self) -> Optional[int]:
2261 2274 """Provide an estimate for the available system memory in Bytes.
2262 2275
2263 2276 This can be overriden via ui.available-memory. It returns None, if
2264 2277 no estimate can be computed.
2265 2278 """
2266 2279 value = self.config(b'ui', b'available-memory')
2267 2280 if value is not None:
2268 2281 try:
2269 2282 return util.sizetoint(value)
2270 2283 except error.ParseError:
2271 2284 raise error.ConfigError(
2272 2285 _(b"ui.available-memory value is invalid ('%s')") % value
2273 2286 )
2274 2287 return util._estimatememory()
2275 2288
2276 2289
2277 2290 # we instantiate one globally shared progress bar to avoid
2278 2291 # competing progress bars when multiple UI objects get created
2279 2292 _progresssingleton: Optional[progress.progbar] = None
2280 2293
2281 2294
2282 2295 def getprogbar(ui: ui) -> progress.progbar:
2283 2296 global _progresssingleton
2284 2297 if _progresssingleton is None:
2285 2298 # passing 'ui' object to the singleton is fishy,
2286 2299 # this is how the extension used to work but feel free to rework it.
2287 2300 _progresssingleton = progress.progbar(ui)
2288 2301 return _progresssingleton
2289 2302
2290 2303
2291 2304 def haveprogbar() -> bool:
2292 2305 return _progresssingleton is not None
2293 2306
2294 2307
2295 def _selectmsgdests(ui):
2308 def _selectmsgdests(ui: ui):
2296 2309 name = ui.config(b'ui', b'message-output')
2297 2310 if name == b'channel':
2298 2311 if ui.fmsg:
2299 2312 return ui.fmsg, ui.fmsg
2300 2313 else:
2301 2314 # fall back to ferr if channel isn't ready so that status/error
2302 2315 # messages can be printed
2303 2316 return ui.ferr, ui.ferr
2304 2317 if name == b'stdio':
2305 2318 return ui.fout, ui.ferr
2306 2319 if name == b'stderr':
2307 2320 return ui.ferr, ui.ferr
2308 2321 raise error.Abort(b'invalid ui.message-output destination: %s' % name)
2309 2322
2310 2323
2311 2324 def _writemsgwith(write, dest, *args: bytes, **opts: _MsgOpts) -> None:
2312 2325 """Write ui message with the given ui._write*() function
2313 2326
2314 2327 The specified message type is translated to 'ui.<type>' label if the dest
2315 2328 isn't a structured channel, so that the message will be colorized.
2316 2329 """
2317 2330 # TODO: maybe change 'type' to a mandatory option
2318 2331 if 'type' in opts and not getattr(dest, 'structured', False):
2319 2332 opts['label'] = opts.get('label', b'') + b' ui.%s' % opts.pop('type')
2320 2333 write(dest, *args, **opts)
General Comments 0
You need to be logged in to leave comments. Login now