##// END OF EJS Templates
typing: add type hints to the prompt methods in mercurial/ui.py...
Matt Harbison -
r50694:de284a0b default
parent child Browse files
Show More
@@ -1,2333 +1,2358 b''
1 1 # ui.py - user interface bits for mercurial
2 2 #
3 3 # Copyright 2005-2007 Olivia Mackall <olivia@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import collections
10 10 import contextlib
11 11 import datetime
12 12 import errno
13 13 import inspect
14 14 import os
15 15 import re
16 16 import signal
17 17 import socket
18 18 import subprocess
19 19 import sys
20 20 import traceback
21 21
22 22 from typing import (
23 23 Any,
24 24 Callable,
25 25 Dict,
26 26 List,
27 27 NoReturn,
28 28 Optional,
29 29 Tuple,
30 30 Type,
31 31 TypeVar,
32 32 Union,
33 33 cast,
34 overload,
34 35 )
35 36
36 37 from .i18n import _
37 38 from .node import hex
38 39 from .pycompat import (
39 40 getattr,
40 41 open,
41 42 )
42 43
43 44 from . import (
44 45 color,
45 46 config,
46 47 configitems,
47 48 encoding,
48 49 error,
49 50 formatter,
50 51 loggingutil,
51 52 progress,
52 53 pycompat,
53 54 rcutil,
54 55 scmutil,
55 56 util,
56 57 )
57 58 from .utils import (
58 59 dateutil,
59 60 procutil,
60 61 resourceutil,
61 62 stringutil,
62 63 urlutil,
63 64 )
64 65
65 66 _ConfigItems = Dict[Tuple[bytes, bytes], object] # {(section, name) : value}
66 67 # The **opts args of the various write() methods can be basically anything, but
67 68 # there's no way to express it as "anything but str". So type it to be the
68 69 # handful of known types that are used.
69 70 _MsgOpts = Union[bytes, bool, List["_PromptChoice"]]
70 71 _PromptChoice = Tuple[bytes, bytes]
71 72 _Tui = TypeVar('_Tui', bound="ui")
72 73
73 74 urlreq = util.urlreq
74 75
75 76 # for use with str.translate(None, _keepalnum), to keep just alphanumerics
76 77 _keepalnum: bytes = b''.join(
77 78 c for c in map(pycompat.bytechr, range(256)) if not c.isalnum()
78 79 )
79 80
80 81 # The config knobs that will be altered (if unset) by ui.tweakdefaults.
81 82 tweakrc: bytes = b"""
82 83 [ui]
83 84 # The rollback command is dangerous. As a rule, don't use it.
84 85 rollback = False
85 86 # Make `hg status` report copy information
86 87 statuscopies = yes
87 88 # Prefer curses UIs when available. Revert to plain-text with `text`.
88 89 interface = curses
89 90 # Make compatible commands emit cwd-relative paths by default.
90 91 relative-paths = yes
91 92
92 93 [commands]
93 94 # Grep working directory by default.
94 95 grep.all-files = True
95 96 # Refuse to perform an `hg update` that would cause a file content merge
96 97 update.check = noconflict
97 98 # Show conflicts information in `hg status`
98 99 status.verbose = True
99 100 # Make `hg resolve` with no action (like `-m`) fail instead of re-merging.
100 101 resolve.explicit-re-merge = True
101 102
102 103 [diff]
103 104 git = 1
104 105 showfunc = 1
105 106 word-diff = 1
106 107 """
107 108
108 109 samplehgrcs: Dict[bytes, bytes] = {
109 110 b'user': b"""# example user config (see 'hg help config' for more info)
110 111 [ui]
111 112 # name and email, e.g.
112 113 # username = Jane Doe <jdoe@example.com>
113 114 username =
114 115
115 116 # We recommend enabling tweakdefaults to get slight improvements to
116 117 # the UI over time. Make sure to set HGPLAIN in the environment when
117 118 # writing scripts!
118 119 # tweakdefaults = True
119 120
120 121 # uncomment to disable color in command output
121 122 # (see 'hg help color' for details)
122 123 # color = never
123 124
124 125 # uncomment to disable command output pagination
125 126 # (see 'hg help pager' for details)
126 127 # paginate = never
127 128
128 129 [extensions]
129 130 # uncomment the lines below to enable some popular extensions
130 131 # (see 'hg help extensions' for more info)
131 132 #
132 133 # histedit =
133 134 # rebase =
134 135 # uncommit =
135 136 """,
136 137 b'cloned': b"""# example repository config (see 'hg help config' for more info)
137 138 [paths]
138 139 default = %s
139 140
140 141 # path aliases to other clones of this repo in URLs or filesystem paths
141 142 # (see 'hg help config.paths' for more info)
142 143 #
143 144 # default:pushurl = ssh://jdoe@example.net/hg/jdoes-fork
144 145 # my-fork = ssh://jdoe@example.net/hg/jdoes-fork
145 146 # my-clone = /home/jdoe/jdoes-clone
146 147
147 148 [ui]
148 149 # name and email (local to this repository, optional), e.g.
149 150 # username = Jane Doe <jdoe@example.com>
150 151 """,
151 152 b'local': b"""# example repository config (see 'hg help config' for more info)
152 153 [paths]
153 154 # path aliases to other clones of this repo in URLs or filesystem paths
154 155 # (see 'hg help config.paths' for more info)
155 156 #
156 157 # default = http://example.com/hg/example-repo
157 158 # default:pushurl = ssh://jdoe@example.net/hg/jdoes-fork
158 159 # my-fork = ssh://jdoe@example.net/hg/jdoes-fork
159 160 # my-clone = /home/jdoe/jdoes-clone
160 161
161 162 [ui]
162 163 # name and email (local to this repository, optional), e.g.
163 164 # username = Jane Doe <jdoe@example.com>
164 165 """,
165 166 b'global': b"""# example system-wide hg config (see 'hg help config' for more info)
166 167
167 168 [ui]
168 169 # uncomment to disable color in command output
169 170 # (see 'hg help color' for details)
170 171 # color = never
171 172
172 173 # uncomment to disable command output pagination
173 174 # (see 'hg help pager' for details)
174 175 # paginate = never
175 176
176 177 [extensions]
177 178 # uncomment the lines below to enable some popular extensions
178 179 # (see 'hg help extensions' for more info)
179 180 #
180 181 # blackbox =
181 182 # churn =
182 183 """,
183 184 }
184 185
185 186
186 187 def _maybestrurl(maybebytes):
187 188 return pycompat.rapply(pycompat.strurl, maybebytes)
188 189
189 190
190 191 def _maybebytesurl(maybestr):
191 192 return pycompat.rapply(pycompat.bytesurl, maybestr)
192 193
193 194
194 195 class httppasswordmgrdbproxy:
195 196 """Delays loading urllib2 until it's needed."""
196 197
197 198 def __init__(self) -> None:
198 199 self._mgr = None
199 200
200 201 def _get_mgr(self):
201 202 if self._mgr is None:
202 203 self._mgr = urlreq.httppasswordmgrwithdefaultrealm()
203 204 return self._mgr
204 205
205 206 def add_password(self, realm, uris, user, passwd):
206 207 return self._get_mgr().add_password(
207 208 _maybestrurl(realm),
208 209 _maybestrurl(uris),
209 210 _maybestrurl(user),
210 211 _maybestrurl(passwd),
211 212 )
212 213
213 214 def find_user_password(self, realm, uri):
214 215 mgr = self._get_mgr()
215 216 return _maybebytesurl(
216 217 mgr.find_user_password(_maybestrurl(realm), _maybestrurl(uri))
217 218 )
218 219
219 220
220 221 def _catchterm(*args) -> NoReturn:
221 222 raise error.SignalInterrupt
222 223
223 224
224 225 # unique object used to detect no default value has been provided when
225 226 # retrieving configuration value.
226 227 _unset = object()
227 228
228 229 # _reqexithandlers: callbacks run at the end of a request
229 230 _reqexithandlers: List = []
230 231
231 232
232 233 class ui:
233 234 def __init__(self, src: Optional["ui"] = None) -> None:
234 235 """Create a fresh new ui object if no src given
235 236
236 237 Use uimod.ui.load() to create a ui which knows global and user configs.
237 238 In most cases, you should use ui.copy() to create a copy of an existing
238 239 ui object.
239 240 """
240 241 # _buffers: used for temporary capture of output
241 242 self._buffers = []
242 243 # 3-tuple describing how each buffer in the stack behaves.
243 244 # Values are (capture stderr, capture subprocesses, apply labels).
244 245 self._bufferstates = []
245 246 # When a buffer is active, defines whether we are expanding labels.
246 247 # This exists to prevent an extra list lookup.
247 248 self._bufferapplylabels = None
248 249 self.quiet = self.verbose = self.debugflag = self.tracebackflag = False
249 250 self._reportuntrusted = True
250 251 self._knownconfig = configitems.coreitems
251 252 self._ocfg = config.config() # overlay
252 253 self._tcfg = config.config() # trusted
253 254 self._ucfg = config.config() # untrusted
254 255 self._trustusers = set()
255 256 self._trustgroups = set()
256 257 self.callhooks = True
257 258 # hold the root to use for each [paths] entry
258 259 self._path_to_root = {}
259 260 # Insecure server connections requested.
260 261 self.insecureconnections = False
261 262 # Blocked time
262 263 self.logblockedtimes = False
263 264 # color mode: see mercurial/color.py for possible value
264 265 self._colormode = None
265 266 self._terminfoparams = {}
266 267 self._styles = {}
267 268 self._uninterruptible = False
268 269 self.showtimestamp = False
269 270
270 271 if src:
271 272 self._fout = src._fout
272 273 self._ferr = src._ferr
273 274 self._fin = src._fin
274 275 self._fmsg = src._fmsg
275 276 self._fmsgout = src._fmsgout
276 277 self._fmsgerr = src._fmsgerr
277 278 self._finoutredirected = src._finoutredirected
278 279 self._loggers = src._loggers.copy()
279 280 self.pageractive = src.pageractive
280 281 self._disablepager = src._disablepager
281 282 self._tweaked = src._tweaked
282 283
283 284 self._tcfg = src._tcfg.copy()
284 285 self._ucfg = src._ucfg.copy()
285 286 self._ocfg = src._ocfg.copy()
286 287 self._trustusers = src._trustusers.copy()
287 288 self._trustgroups = src._trustgroups.copy()
288 289 self.environ = src.environ
289 290 self.callhooks = src.callhooks
290 291 self._path_to_root = src._path_to_root
291 292 self.insecureconnections = src.insecureconnections
292 293 self._colormode = src._colormode
293 294 self._terminfoparams = src._terminfoparams.copy()
294 295 self._styles = src._styles.copy()
295 296
296 297 self.fixconfig()
297 298
298 299 self.httppasswordmgrdb = src.httppasswordmgrdb
299 300 self._blockedtimes = src._blockedtimes
300 301 else:
301 302 self._fout = procutil.stdout
302 303 self._ferr = procutil.stderr
303 304 self._fin = procutil.stdin
304 305 self._fmsg = None
305 306 self._fmsgout = self.fout # configurable
306 307 self._fmsgerr = self.ferr # configurable
307 308 self._finoutredirected = False
308 309 self._loggers = {}
309 310 self.pageractive = False
310 311 self._disablepager = False
311 312 self._tweaked = False
312 313
313 314 # shared read-only environment
314 315 self.environ = encoding.environ
315 316
316 317 self.httppasswordmgrdb = httppasswordmgrdbproxy()
317 318 self._blockedtimes = collections.defaultdict(int)
318 319
319 320 allowed = self.configlist(b'experimental', b'exportableenviron')
320 321 if b'*' in allowed:
321 322 self._exportableenviron = self.environ
322 323 else:
323 324 self._exportableenviron = {}
324 325 for k in allowed:
325 326 if k in self.environ:
326 327 self._exportableenviron[k] = self.environ[k]
327 328
328 329 def _new_source(self) -> None:
329 330 self._ocfg.new_source()
330 331 self._tcfg.new_source()
331 332 self._ucfg.new_source()
332 333
333 334 @classmethod
334 335 def load(cls: Type[_Tui]) -> _Tui:
335 336 """Create a ui and load global and user configs"""
336 337 u = cls()
337 338 # we always trust global config files and environment variables
338 339 for t, f in rcutil.rccomponents():
339 340 if t == b'path':
340 341 u.readconfig(f, trust=True)
341 342 elif t == b'resource':
342 343 u.read_resource_config(f, trust=True)
343 344 elif t == b'items':
344 345 u._new_source()
345 346 sections = set()
346 347 for section, name, value, source in f:
347 348 # do not set u._ocfg
348 349 # XXX clean this up once immutable config object is a thing
349 350 u._tcfg.set(section, name, value, source)
350 351 u._ucfg.set(section, name, value, source)
351 352 sections.add(section)
352 353 for section in sections:
353 354 u.fixconfig(section=section)
354 355 else:
355 356 raise error.ProgrammingError(b'unknown rctype: %s' % t)
356 357 u._maybetweakdefaults()
357 358 u._new_source() # anything after that is a different level
358 359 return u
359 360
360 361 def _maybetweakdefaults(self) -> None:
361 362 if not self.configbool(b'ui', b'tweakdefaults'):
362 363 return
363 364 if self._tweaked or self.plain(b'tweakdefaults'):
364 365 return
365 366
366 367 # Note: it is SUPER IMPORTANT that you set self._tweaked to
367 368 # True *before* any calls to setconfig(), otherwise you'll get
368 369 # infinite recursion between setconfig and this method.
369 370 #
370 371 # TODO: We should extract an inner method in setconfig() to
371 372 # avoid this weirdness.
372 373 self._tweaked = True
373 374 tmpcfg = config.config()
374 375 tmpcfg.parse(b'<tweakdefaults>', tweakrc)
375 376 for section in tmpcfg:
376 377 for name, value in tmpcfg.items(section):
377 378 if not self.hasconfig(section, name):
378 379 self.setconfig(section, name, value, b"<tweakdefaults>")
379 380
380 381 def copy(self: _Tui) -> _Tui:
381 382 return self.__class__(self)
382 383
383 384 def resetstate(self) -> None:
384 385 """Clear internal state that shouldn't persist across commands"""
385 386 if self._progbar:
386 387 self._progbar.resetstate() # reset last-print time of progress bar
387 388 self.httppasswordmgrdb = httppasswordmgrdbproxy()
388 389
389 390 @contextlib.contextmanager
390 391 def timeblockedsection(self, key: bytes):
391 392 # this is open-coded below - search for timeblockedsection to find them
392 393 starttime = util.timer()
393 394 try:
394 395 yield
395 396 finally:
396 397 self._blockedtimes[key + b'_blocked'] += (
397 398 util.timer() - starttime
398 399 ) * 1000
399 400
400 401 @contextlib.contextmanager
401 402 def uninterruptible(self):
402 403 """Mark an operation as unsafe.
403 404
404 405 Most operations on a repository are safe to interrupt, but a
405 406 few are risky (for example repair.strip). This context manager
406 407 lets you advise Mercurial that something risky is happening so
407 408 that control-C etc can be blocked if desired.
408 409 """
409 410 enabled = self.configbool(b'experimental', b'nointerrupt')
410 411 if enabled and self.configbool(
411 412 b'experimental', b'nointerrupt-interactiveonly'
412 413 ):
413 414 enabled = self.interactive()
414 415 if self._uninterruptible or not enabled:
415 416 # if nointerrupt support is turned off, the process isn't
416 417 # interactive, or we're already in an uninterruptible
417 418 # block, do nothing.
418 419 yield
419 420 return
420 421
421 422 def warn():
422 423 self.warn(_(b"shutting down cleanly\n"))
423 424 self.warn(
424 425 _(b"press ^C again to terminate immediately (dangerous)\n")
425 426 )
426 427 return True
427 428
428 429 with procutil.uninterruptible(warn):
429 430 try:
430 431 self._uninterruptible = True
431 432 yield
432 433 finally:
433 434 self._uninterruptible = False
434 435
435 436 def formatter(self, topic: bytes, opts):
436 437 return formatter.formatter(self, self, topic, opts)
437 438
438 439 def _trusted(self, fp, f: bytes) -> bool:
439 440 st = util.fstat(fp)
440 441 if util.isowner(st):
441 442 return True
442 443
443 444 tusers, tgroups = self._trustusers, self._trustgroups
444 445 if b'*' in tusers or b'*' in tgroups:
445 446 return True
446 447
447 448 user = util.username(st.st_uid)
448 449 group = util.groupname(st.st_gid)
449 450 if user in tusers or group in tgroups or user == util.username():
450 451 return True
451 452
452 453 if self._reportuntrusted:
453 454 self.warn(
454 455 _(
455 456 b'not trusting file %s from untrusted '
456 457 b'user %s, group %s\n'
457 458 )
458 459 % (f, user, group)
459 460 )
460 461 return False
461 462
462 463 def read_resource_config(
463 464 self, name, root=None, trust=False, sections=None, remap=None
464 465 ) -> None:
465 466 try:
466 467 fp = resourceutil.open_resource(name[0], name[1])
467 468 except IOError:
468 469 if not sections: # ignore unless we were looking for something
469 470 return
470 471 raise
471 472
472 473 self._readconfig(
473 474 b'resource:%s.%s' % name, fp, root, trust, sections, remap
474 475 )
475 476
476 477 def readconfig(
477 478 self, filename, root=None, trust=False, sections=None, remap=None
478 479 ) -> None:
479 480 try:
480 481 fp = open(filename, 'rb')
481 482 except IOError:
482 483 if not sections: # ignore unless we were looking for something
483 484 return
484 485 raise
485 486
486 487 self._readconfig(filename, fp, root, trust, sections, remap)
487 488
488 489 def _readconfig(
489 490 self, filename, fp, root=None, trust=False, sections=None, remap=None
490 491 ) -> None:
491 492 with fp:
492 493 cfg = config.config()
493 494 trusted = sections or trust or self._trusted(fp, filename)
494 495
495 496 try:
496 497 cfg.read(filename, fp, sections=sections, remap=remap)
497 498 except error.ConfigError as inst:
498 499 if trusted:
499 500 raise
500 501 self.warn(
501 502 _(b'ignored %s: %s\n') % (inst.location, inst.message)
502 503 )
503 504
504 505 self._applyconfig(cfg, trusted, root)
505 506
506 507 def applyconfig(
507 508 self, configitems: _ConfigItems, source=b"", root=None
508 509 ) -> None:
509 510 """Add configitems from a non-file source. Unlike with ``setconfig()``,
510 511 they can be overridden by subsequent config file reads. The items are
511 512 in the same format as ``configoverride()``, namely a dict of the
512 513 following structures: {(section, name) : value}
513 514
514 515 Typically this is used by extensions that inject themselves into the
515 516 config file load procedure by monkeypatching ``localrepo.loadhgrc()``.
516 517 """
517 518 cfg = config.config()
518 519
519 520 for (section, name), value in configitems.items():
520 521 cfg.set(section, name, value, source)
521 522
522 523 self._applyconfig(cfg, True, root)
523 524
524 525 def _applyconfig(self, cfg, trusted, root) -> None:
525 526 if self.plain():
526 527 for k in (
527 528 b'debug',
528 529 b'fallbackencoding',
529 530 b'quiet',
530 531 b'slash',
531 532 b'logtemplate',
532 533 b'message-output',
533 534 b'statuscopies',
534 535 b'style',
535 536 b'traceback',
536 537 b'verbose',
537 538 ):
538 539 if k in cfg[b'ui']:
539 540 del cfg[b'ui'][k]
540 541 for k, v in cfg.items(b'defaults'):
541 542 del cfg[b'defaults'][k]
542 543 for k, v in cfg.items(b'commands'):
543 544 del cfg[b'commands'][k]
544 545 for k, v in cfg.items(b'command-templates'):
545 546 del cfg[b'command-templates'][k]
546 547 # Don't remove aliases from the configuration if in the exceptionlist
547 548 if self.plain(b'alias'):
548 549 for k, v in cfg.items(b'alias'):
549 550 del cfg[b'alias'][k]
550 551 if self.plain(b'revsetalias'):
551 552 for k, v in cfg.items(b'revsetalias'):
552 553 del cfg[b'revsetalias'][k]
553 554 if self.plain(b'templatealias'):
554 555 for k, v in cfg.items(b'templatealias'):
555 556 del cfg[b'templatealias'][k]
556 557
557 558 if trusted:
558 559 self._tcfg.update(cfg)
559 560 self._tcfg.update(self._ocfg)
560 561 self._ucfg.update(cfg)
561 562 self._ucfg.update(self._ocfg)
562 563
563 564 if root is None:
564 565 root = os.path.expanduser(b'~')
565 566 self.fixconfig(root=root)
566 567
567 568 def fixconfig(self, root=None, section=None) -> None:
568 569 if section in (None, b'paths'):
569 570 # expand vars and ~
570 571 # translate paths relative to root (or home) into absolute paths
571 572 root = root or encoding.getcwd()
572 573 for c in self._tcfg, self._ucfg, self._ocfg:
573 574 for n, p in c.items(b'paths'):
574 575 old_p = p
575 576 s = self.configsource(b'paths', n) or b'none'
576 577 root_key = (n, p, s)
577 578 if root_key not in self._path_to_root:
578 579 self._path_to_root[root_key] = root
579 580 # Ignore sub-options.
580 581 if b':' in n:
581 582 continue
582 583 if not p:
583 584 continue
584 585 if b'%%' in p:
585 586 if s is None:
586 587 s = 'none'
587 588 self.warn(
588 589 _(b"(deprecated '%%' in path %s=%s from %s)\n")
589 590 % (n, p, s)
590 591 )
591 592 p = p.replace(b'%%', b'%')
592 593 if p != old_p:
593 594 c.alter(b"paths", n, p)
594 595
595 596 if section in (None, b'ui'):
596 597 # update ui options
597 598 self._fmsgout, self._fmsgerr = _selectmsgdests(self)
598 599 self.debugflag = self.configbool(b'ui', b'debug')
599 600 self.verbose = self.debugflag or self.configbool(b'ui', b'verbose')
600 601 self.quiet = not self.debugflag and self.configbool(b'ui', b'quiet')
601 602 if self.verbose and self.quiet:
602 603 self.quiet = self.verbose = False
603 604 self._reportuntrusted = self.debugflag or self.configbool(
604 605 b"ui", b"report_untrusted"
605 606 )
606 607 self.showtimestamp = self.configbool(b'ui', b'timestamp-output')
607 608 self.tracebackflag = self.configbool(b'ui', b'traceback')
608 609 self.logblockedtimes = self.configbool(b'ui', b'logblockedtimes')
609 610
610 611 if section in (None, b'trusted'):
611 612 # update trust information
612 613 self._trustusers.update(self.configlist(b'trusted', b'users'))
613 614 self._trustgroups.update(self.configlist(b'trusted', b'groups'))
614 615
615 616 if section in (None, b'devel', b'ui') and self.debugflag:
616 617 tracked = set()
617 618 if self.configbool(b'devel', b'debug.extensions'):
618 619 tracked.add(b'extension')
619 620 if tracked:
620 621 logger = loggingutil.fileobjectlogger(self._ferr, tracked)
621 622 self.setlogger(b'debug', logger)
622 623
623 624 def backupconfig(self, section, item):
624 625 return (
625 626 self._ocfg.backup(section, item),
626 627 self._tcfg.backup(section, item),
627 628 self._ucfg.backup(section, item),
628 629 )
629 630
630 631 def restoreconfig(self, data) -> None:
631 632 self._ocfg.restore(data[0])
632 633 self._tcfg.restore(data[1])
633 634 self._ucfg.restore(data[2])
634 635
635 636 def setconfig(self, section, name, value, source=b'') -> None:
636 637 for cfg in (self._ocfg, self._tcfg, self._ucfg):
637 638 cfg.set(section, name, value, source)
638 639 self.fixconfig(section=section)
639 640 self._maybetweakdefaults()
640 641
641 642 def _data(self, untrusted):
642 643 return untrusted and self._ucfg or self._tcfg
643 644
644 645 def configsource(self, section, name, untrusted=False):
645 646 return self._data(untrusted).source(section, name)
646 647
647 648 def config(self, section, name, default=_unset, untrusted=False):
648 649 """return the plain string version of a config"""
649 650 value = self._config(
650 651 section, name, default=default, untrusted=untrusted
651 652 )
652 653 if value is _unset:
653 654 return None
654 655 return value
655 656
656 657 def _config(self, section, name, default=_unset, untrusted=False):
657 658 value = itemdefault = default
658 659 item = self._knownconfig.get(section, {}).get(name)
659 660 alternates = [(section, name)]
660 661
661 662 if item is not None:
662 663 alternates.extend(item.alias)
663 664 if callable(item.default):
664 665 itemdefault = item.default()
665 666 else:
666 667 itemdefault = item.default
667 668 else:
668 669 msg = b"accessing unregistered config item: '%s.%s'"
669 670 msg %= (section, name)
670 671 self.develwarn(msg, 2, b'warn-config-unknown')
671 672
672 673 if default is _unset:
673 674 if item is None:
674 675 value = default
675 676 elif item.default is configitems.dynamicdefault:
676 677 value = None
677 678 msg = b"config item requires an explicit default value: '%s.%s'"
678 679 msg %= (section, name)
679 680 self.develwarn(msg, 2, b'warn-config-default')
680 681 else:
681 682 value = itemdefault
682 683 elif (
683 684 item is not None
684 685 and item.default is not configitems.dynamicdefault
685 686 and default != itemdefault
686 687 ):
687 688 msg = (
688 689 b"specifying a mismatched default value for a registered "
689 690 b"config item: '%s.%s' '%s'"
690 691 )
691 692 msg %= (section, name, pycompat.bytestr(default))
692 693 self.develwarn(msg, 2, b'warn-config-default')
693 694
694 695 candidates = []
695 696 config = self._data(untrusted)
696 697 for s, n in alternates:
697 698 candidate = config.get(s, n, None)
698 699 if candidate is not None:
699 700 candidates.append((s, n, candidate))
700 701 if candidates:
701 702
702 703 def level(x):
703 704 return config.level(x[0], x[1])
704 705
705 706 value = max(candidates, key=level)[2]
706 707
707 708 if self.debugflag and not untrusted and self._reportuntrusted:
708 709 for s, n in alternates:
709 710 uvalue = self._ucfg.get(s, n)
710 711 if uvalue is not None and uvalue != value:
711 712 self.debug(
712 713 b"ignoring untrusted configuration option "
713 714 b"%s.%s = %s\n" % (s, n, uvalue)
714 715 )
715 716 return value
716 717
717 718 def config_default(self, section, name):
718 719 """return the default value for a config option
719 720
720 721 The default is returned "raw", for example if it is a callable, the
721 722 callable was not called.
722 723 """
723 724 item = self._knownconfig.get(section, {}).get(name)
724 725
725 726 if item is None:
726 727 raise KeyError((section, name))
727 728 return item.default
728 729
729 730 def configsuboptions(self, section, name, default=_unset, untrusted=False):
730 731 """Get a config option and all sub-options.
731 732
732 733 Some config options have sub-options that are declared with the
733 734 format "key:opt = value". This method is used to return the main
734 735 option and all its declared sub-options.
735 736
736 737 Returns a 2-tuple of ``(option, sub-options)``, where `sub-options``
737 738 is a dict of defined sub-options where keys and values are strings.
738 739 """
739 740 main = self.config(section, name, default, untrusted=untrusted)
740 741 data = self._data(untrusted)
741 742 sub = {}
742 743 prefix = b'%s:' % name
743 744 for k, v in data.items(section):
744 745 if k.startswith(prefix):
745 746 sub[k[len(prefix) :]] = v
746 747
747 748 if self.debugflag and not untrusted and self._reportuntrusted:
748 749 for k, v in sub.items():
749 750 uvalue = self._ucfg.get(section, b'%s:%s' % (name, k))
750 751 if uvalue is not None and uvalue != v:
751 752 self.debug(
752 753 b'ignoring untrusted configuration option '
753 754 b'%s:%s.%s = %s\n' % (section, name, k, uvalue)
754 755 )
755 756
756 757 return main, sub
757 758
758 759 def configpath(self, section, name, default=_unset, untrusted=False):
759 760 """get a path config item, expanded relative to repo root or config
760 761 file"""
761 762 v = self.config(section, name, default, untrusted)
762 763 if v is None:
763 764 return None
764 765 if not os.path.isabs(v) or b"://" not in v:
765 766 src = self.configsource(section, name, untrusted)
766 767 if b':' in src:
767 768 base = os.path.dirname(src.rsplit(b':')[0])
768 769 v = os.path.join(base, os.path.expanduser(v))
769 770 return v
770 771
771 772 def configbool(self, section, name, default=_unset, untrusted=False):
772 773 """parse a configuration element as a boolean
773 774
774 775 >>> u = ui(); s = b'foo'
775 776 >>> u.setconfig(s, b'true', b'yes')
776 777 >>> u.configbool(s, b'true')
777 778 True
778 779 >>> u.setconfig(s, b'false', b'no')
779 780 >>> u.configbool(s, b'false')
780 781 False
781 782 >>> u.configbool(s, b'unknown')
782 783 False
783 784 >>> u.configbool(s, b'unknown', True)
784 785 True
785 786 >>> u.setconfig(s, b'invalid', b'somevalue')
786 787 >>> u.configbool(s, b'invalid')
787 788 Traceback (most recent call last):
788 789 ...
789 790 ConfigError: foo.invalid is not a boolean ('somevalue')
790 791 """
791 792
792 793 v = self._config(section, name, default, untrusted=untrusted)
793 794 if v is None:
794 795 return v
795 796 if v is _unset:
796 797 if default is _unset:
797 798 return False
798 799 return default
799 800 if isinstance(v, bool):
800 801 return v
801 802 b = stringutil.parsebool(v)
802 803 if b is None:
803 804 raise error.ConfigError(
804 805 _(b"%s.%s is not a boolean ('%s')") % (section, name, v)
805 806 )
806 807 return b
807 808
808 809 def configwith(
809 810 self, convert, section, name, default=_unset, desc=None, untrusted=False
810 811 ):
811 812 """parse a configuration element with a conversion function
812 813
813 814 >>> u = ui(); s = b'foo'
814 815 >>> u.setconfig(s, b'float1', b'42')
815 816 >>> u.configwith(float, s, b'float1')
816 817 42.0
817 818 >>> u.setconfig(s, b'float2', b'-4.25')
818 819 >>> u.configwith(float, s, b'float2')
819 820 -4.25
820 821 >>> u.configwith(float, s, b'unknown', 7)
821 822 7.0
822 823 >>> u.setconfig(s, b'invalid', b'somevalue')
823 824 >>> u.configwith(float, s, b'invalid')
824 825 Traceback (most recent call last):
825 826 ...
826 827 ConfigError: foo.invalid is not a valid float ('somevalue')
827 828 >>> u.configwith(float, s, b'invalid', desc=b'womble')
828 829 Traceback (most recent call last):
829 830 ...
830 831 ConfigError: foo.invalid is not a valid womble ('somevalue')
831 832 """
832 833
833 834 v = self.config(section, name, default, untrusted)
834 835 if v is None:
835 836 return v # do not attempt to convert None
836 837 try:
837 838 return convert(v)
838 839 except (ValueError, error.ParseError):
839 840 if desc is None:
840 841 desc = pycompat.sysbytes(convert.__name__)
841 842 raise error.ConfigError(
842 843 _(b"%s.%s is not a valid %s ('%s')") % (section, name, desc, v)
843 844 )
844 845
845 846 def configint(self, section, name, default=_unset, untrusted=False):
846 847 """parse a configuration element as an integer
847 848
848 849 >>> u = ui(); s = b'foo'
849 850 >>> u.setconfig(s, b'int1', b'42')
850 851 >>> u.configint(s, b'int1')
851 852 42
852 853 >>> u.setconfig(s, b'int2', b'-42')
853 854 >>> u.configint(s, b'int2')
854 855 -42
855 856 >>> u.configint(s, b'unknown', 7)
856 857 7
857 858 >>> u.setconfig(s, b'invalid', b'somevalue')
858 859 >>> u.configint(s, b'invalid')
859 860 Traceback (most recent call last):
860 861 ...
861 862 ConfigError: foo.invalid is not a valid integer ('somevalue')
862 863 """
863 864
864 865 return self.configwith(
865 866 int, section, name, default, b'integer', untrusted
866 867 )
867 868
868 869 def configbytes(self, section, name, default=_unset, untrusted=False):
869 870 """parse a configuration element as a quantity in bytes
870 871
871 872 Units can be specified as b (bytes), k or kb (kilobytes), m or
872 873 mb (megabytes), g or gb (gigabytes).
873 874
874 875 >>> u = ui(); s = b'foo'
875 876 >>> u.setconfig(s, b'val1', b'42')
876 877 >>> u.configbytes(s, b'val1')
877 878 42
878 879 >>> u.setconfig(s, b'val2', b'42.5 kb')
879 880 >>> u.configbytes(s, b'val2')
880 881 43520
881 882 >>> u.configbytes(s, b'unknown', b'7 MB')
882 883 7340032
883 884 >>> u.setconfig(s, b'invalid', b'somevalue')
884 885 >>> u.configbytes(s, b'invalid')
885 886 Traceback (most recent call last):
886 887 ...
887 888 ConfigError: foo.invalid is not a byte quantity ('somevalue')
888 889 """
889 890
890 891 value = self._config(section, name, default, untrusted)
891 892 if value is _unset:
892 893 if default is _unset:
893 894 default = 0
894 895 value = default
895 896 if not isinstance(value, bytes):
896 897 return value
897 898 try:
898 899 return util.sizetoint(value)
899 900 except error.ParseError:
900 901 raise error.ConfigError(
901 902 _(b"%s.%s is not a byte quantity ('%s')")
902 903 % (section, name, value)
903 904 )
904 905
905 906 def configlist(self, section, name, default=_unset, untrusted=False):
906 907 """parse a configuration element as a list of comma/space separated
907 908 strings
908 909
909 910 >>> u = ui(); s = b'foo'
910 911 >>> u.setconfig(s, b'list1', b'this,is "a small" ,test')
911 912 >>> u.configlist(s, b'list1')
912 913 ['this', 'is', 'a small', 'test']
913 914 >>> u.setconfig(s, b'list2', b'this, is "a small" , test ')
914 915 >>> u.configlist(s, b'list2')
915 916 ['this', 'is', 'a small', 'test']
916 917 """
917 918 # default is not always a list
918 919 v = self.configwith(
919 920 stringutil.parselist, section, name, default, b'list', untrusted
920 921 )
921 922 if isinstance(v, bytes):
922 923 return stringutil.parselist(v)
923 924 elif v is None:
924 925 return []
925 926 return v
926 927
927 928 def configdate(self, section, name, default=_unset, untrusted=False):
928 929 """parse a configuration element as a tuple of ints
929 930
930 931 >>> u = ui(); s = b'foo'
931 932 >>> u.setconfig(s, b'date', b'0 0')
932 933 >>> u.configdate(s, b'date')
933 934 (0, 0)
934 935 """
935 936 if self.config(section, name, default, untrusted):
936 937 return self.configwith(
937 938 dateutil.parsedate, section, name, default, b'date', untrusted
938 939 )
939 940 if default is _unset:
940 941 return None
941 942 return default
942 943
943 944 def configdefault(self, section, name):
944 945 """returns the default value of the config item"""
945 946 item = self._knownconfig.get(section, {}).get(name)
946 947 itemdefault = None
947 948 if item is not None:
948 949 if callable(item.default):
949 950 itemdefault = item.default()
950 951 else:
951 952 itemdefault = item.default
952 953 return itemdefault
953 954
954 955 def hasconfig(self, section, name, untrusted=False):
955 956 return self._data(untrusted).hasitem(section, name)
956 957
957 958 def has_section(self, section, untrusted=False):
958 959 '''tell whether section exists in config.'''
959 960 return section in self._data(untrusted)
960 961
961 962 def configitems(self, section, untrusted=False, ignoresub=False):
962 963 items = self._data(untrusted).items(section)
963 964 if ignoresub:
964 965 items = [i for i in items if b':' not in i[0]]
965 966 if self.debugflag and not untrusted and self._reportuntrusted:
966 967 for k, v in self._ucfg.items(section):
967 968 if self._tcfg.get(section, k) != v:
968 969 self.debug(
969 970 b"ignoring untrusted configuration option "
970 971 b"%s.%s = %s\n" % (section, k, v)
971 972 )
972 973 return items
973 974
974 975 def walkconfig(self, untrusted=False, all_known=False):
975 976 defined = self._walk_config(untrusted)
976 977 if not all_known:
977 978 for d in defined:
978 979 yield d
979 980 return
980 981 known = self._walk_known()
981 982 current_defined = next(defined, None)
982 983 current_known = next(known, None)
983 984 while current_defined is not None or current_known is not None:
984 985 if current_defined is None:
985 986 yield current_known
986 987 current_known = next(known, None)
987 988 elif current_known is None:
988 989 yield current_defined
989 990 current_defined = next(defined, None)
990 991 elif current_known[0:2] == current_defined[0:2]:
991 992 yield current_defined
992 993 current_defined = next(defined, None)
993 994 current_known = next(known, None)
994 995 elif current_known[0:2] < current_defined[0:2]:
995 996 yield current_known
996 997 current_known = next(known, None)
997 998 else:
998 999 yield current_defined
999 1000 current_defined = next(defined, None)
1000 1001
1001 1002 def _walk_known(self):
1002 1003 for section, items in sorted(self._knownconfig.items()):
1003 1004 for k, i in sorted(items.items()):
1004 1005 # We don't have a way to display generic well, so skip them
1005 1006 if i.generic:
1006 1007 continue
1007 1008 if callable(i.default):
1008 1009 default = i.default()
1009 1010 elif i.default is configitems.dynamicdefault:
1010 1011 default = b'<DYNAMIC>'
1011 1012 else:
1012 1013 default = i.default
1013 1014 yield section, i.name, default
1014 1015
1015 1016 def _walk_config(self, untrusted):
1016 1017 cfg = self._data(untrusted)
1017 1018 for section in cfg.sections():
1018 1019 for name, value in self.configitems(section, untrusted):
1019 1020 yield section, name, value
1020 1021
1021 1022 def plain(self, feature: Optional[bytes] = None) -> bool:
1022 1023 """is plain mode active?
1023 1024
1024 1025 Plain mode means that all configuration variables which affect
1025 1026 the behavior and output of Mercurial should be
1026 1027 ignored. Additionally, the output should be stable,
1027 1028 reproducible and suitable for use in scripts or applications.
1028 1029
1029 1030 The only way to trigger plain mode is by setting either the
1030 1031 `HGPLAIN' or `HGPLAINEXCEPT' environment variables.
1031 1032
1032 1033 The return value can either be
1033 1034 - False if HGPLAIN is not set, or feature is in HGPLAINEXCEPT
1034 1035 - False if feature is disabled by default and not included in HGPLAIN
1035 1036 - True otherwise
1036 1037 """
1037 1038 if (
1038 1039 b'HGPLAIN' not in encoding.environ
1039 1040 and b'HGPLAINEXCEPT' not in encoding.environ
1040 1041 ):
1041 1042 return False
1042 1043 exceptions = (
1043 1044 encoding.environ.get(b'HGPLAINEXCEPT', b'').strip().split(b',')
1044 1045 )
1045 1046 # TODO: add support for HGPLAIN=+feature,-feature syntax
1046 1047 if b'+strictflags' not in encoding.environ.get(b'HGPLAIN', b'').split(
1047 1048 b','
1048 1049 ):
1049 1050 exceptions.append(b'strictflags')
1050 1051 if feature and exceptions:
1051 1052 return feature not in exceptions
1052 1053 return True
1053 1054
1054 1055 def username(self, acceptempty=False):
1055 1056 """Return default username to be used in commits.
1056 1057
1057 1058 Searched in this order: $HGUSER, [ui] section of hgrcs, $EMAIL
1058 1059 and stop searching if one of these is set.
1059 1060 If not found and acceptempty is True, returns None.
1060 1061 If not found and ui.askusername is True, ask the user, else use
1061 1062 ($LOGNAME or $USER or $LNAME or $USERNAME) + "@full.hostname".
1062 1063 If no username could be found, raise an Abort error.
1063 1064 """
1064 1065 user = encoding.environ.get(b"HGUSER")
1065 1066 if user is None:
1066 1067 user = self.config(b"ui", b"username")
1067 1068 if user is not None:
1068 1069 user = os.path.expandvars(user)
1069 1070 if user is None:
1070 1071 user = encoding.environ.get(b"EMAIL")
1071 1072 if user is None and acceptempty:
1072 1073 return user
1073 1074 if user is None and self.configbool(b"ui", b"askusername"):
1074 1075 user = self.prompt(_(b"enter a commit username:"), default=None)
1075 1076 if user is None and not self.interactive():
1076 1077 try:
1077 1078 user = b'%s@%s' % (
1078 1079 procutil.getuser(),
1079 1080 encoding.strtolocal(socket.getfqdn()),
1080 1081 )
1081 1082 self.warn(_(b"no username found, using '%s' instead\n") % user)
1082 1083 except KeyError:
1083 1084 pass
1084 1085 if not user:
1085 1086 raise error.Abort(
1086 1087 _(b'no username supplied'),
1087 1088 hint=_(b"use 'hg config --edit' " b'to set your username'),
1088 1089 )
1089 1090 if b"\n" in user:
1090 1091 raise error.Abort(
1091 1092 _(b"username %r contains a newline\n") % pycompat.bytestr(user)
1092 1093 )
1093 1094 return user
1094 1095
1095 1096 def shortuser(self, user: bytes) -> bytes:
1096 1097 """Return a short representation of a user name or email address."""
1097 1098 if not self.verbose:
1098 1099 user = stringutil.shortuser(user)
1099 1100 return user
1100 1101
1101 1102 def expandpath(self, loc, default=None):
1102 1103 """Return repository location relative to cwd or from [paths]"""
1103 1104 msg = b'ui.expandpath is deprecated, use `get_*` functions from urlutil'
1104 1105 self.deprecwarn(msg, b'6.0')
1105 1106 try:
1106 1107 p = self.getpath(loc)
1107 1108 if p:
1108 1109 return p.rawloc
1109 1110 except error.RepoError:
1110 1111 pass
1111 1112
1112 1113 if default:
1113 1114 try:
1114 1115 p = self.getpath(default)
1115 1116 if p:
1116 1117 return p.rawloc
1117 1118 except error.RepoError:
1118 1119 pass
1119 1120
1120 1121 return loc
1121 1122
1122 1123 @util.propertycache
1123 1124 def paths(self):
1124 1125 return urlutil.paths(self)
1125 1126
1126 1127 def getpath(self, *args, **kwargs):
1127 1128 """see paths.getpath for details
1128 1129
1129 1130 This method exist as `getpath` need a ui for potential warning message.
1130 1131 """
1131 1132 msg = b'ui.getpath is deprecated, use `get_*` functions from urlutil'
1132 1133 self.deprecwarn(msg, b'6.0')
1133 1134 return self.paths.getpath(self, *args, **kwargs)
1134 1135
1135 1136 @property
1136 1137 def fout(self):
1137 1138 return self._fout
1138 1139
1139 1140 @fout.setter
1140 1141 def fout(self, f):
1141 1142 self._fout = f
1142 1143 self._fmsgout, self._fmsgerr = _selectmsgdests(self)
1143 1144
1144 1145 @property
1145 1146 def ferr(self):
1146 1147 return self._ferr
1147 1148
1148 1149 @ferr.setter
1149 1150 def ferr(self, f):
1150 1151 self._ferr = f
1151 1152 self._fmsgout, self._fmsgerr = _selectmsgdests(self)
1152 1153
1153 1154 @property
1154 1155 def fin(self):
1155 1156 return self._fin
1156 1157
1157 1158 @fin.setter
1158 1159 def fin(self, f):
1159 1160 self._fin = f
1160 1161
1161 1162 @property
1162 1163 def fmsg(self):
1163 1164 """Stream dedicated for status/error messages; may be None if
1164 1165 fout/ferr are used"""
1165 1166 return self._fmsg
1166 1167
1167 1168 @fmsg.setter
1168 1169 def fmsg(self, f):
1169 1170 self._fmsg = f
1170 1171 self._fmsgout, self._fmsgerr = _selectmsgdests(self)
1171 1172
1172 1173 @contextlib.contextmanager
1173 1174 def silent(
1174 1175 self, error: bool = False, subproc: bool = False, labeled: bool = False
1175 1176 ):
1176 1177 self.pushbuffer(error=error, subproc=subproc, labeled=labeled)
1177 1178 try:
1178 1179 yield
1179 1180 finally:
1180 1181 self.popbuffer()
1181 1182
1182 1183 def pushbuffer(
1183 1184 self, error: bool = False, subproc: bool = False, labeled: bool = False
1184 1185 ) -> None:
1185 1186 """install a buffer to capture standard output of the ui object
1186 1187
1187 1188 If error is True, the error output will be captured too.
1188 1189
1189 1190 If subproc is True, output from subprocesses (typically hooks) will be
1190 1191 captured too.
1191 1192
1192 1193 If labeled is True, any labels associated with buffered
1193 1194 output will be handled. By default, this has no effect
1194 1195 on the output returned, but extensions and GUI tools may
1195 1196 handle this argument and returned styled output. If output
1196 1197 is being buffered so it can be captured and parsed or
1197 1198 processed, labeled should not be set to True.
1198 1199 """
1199 1200 self._buffers.append([])
1200 1201 self._bufferstates.append((error, subproc, labeled))
1201 1202 self._bufferapplylabels = labeled
1202 1203
1203 1204 def popbuffer(self) -> bytes:
1204 1205 '''pop the last buffer and return the buffered output'''
1205 1206 self._bufferstates.pop()
1206 1207 if self._bufferstates:
1207 1208 self._bufferapplylabels = self._bufferstates[-1][2]
1208 1209 else:
1209 1210 self._bufferapplylabels = None
1210 1211
1211 1212 return b"".join(self._buffers.pop())
1212 1213
1213 1214 def _isbuffered(self, dest) -> bool:
1214 1215 if dest is self._fout:
1215 1216 return bool(self._buffers)
1216 1217 if dest is self._ferr:
1217 1218 return bool(self._bufferstates and self._bufferstates[-1][0])
1218 1219 return False
1219 1220
1220 1221 def canwritewithoutlabels(self) -> bool:
1221 1222 '''check if write skips the label'''
1222 1223 if self._buffers and not self._bufferapplylabels:
1223 1224 return True
1224 1225 return self._colormode is None
1225 1226
1226 1227 def canbatchlabeledwrites(self) -> bool:
1227 1228 '''check if write calls with labels are batchable'''
1228 1229 # Windows color printing is special, see ``write``.
1229 1230 return self._colormode != b'win32'
1230 1231
1231 1232 def write(self, *args: bytes, **opts: _MsgOpts) -> None:
1232 1233 """write args to output
1233 1234
1234 1235 By default, this method simply writes to the buffer or stdout.
1235 1236 Color mode can be set on the UI class to have the output decorated
1236 1237 with color modifier before being written to stdout.
1237 1238
1238 1239 The color used is controlled by an optional keyword argument, "label".
1239 1240 This should be a string containing label names separated by space.
1240 1241 Label names take the form of "topic.type". For example, ui.debug()
1241 1242 issues a label of "ui.debug".
1242 1243
1243 1244 Progress reports via stderr are normally cleared before writing as
1244 1245 stdout and stderr go to the same terminal. This can be skipped with
1245 1246 the optional keyword argument "keepprogressbar". The progress bar
1246 1247 will continue to occupy a partial line on stderr in that case.
1247 1248 This functionality is intended when Mercurial acts as data source
1248 1249 in a pipe.
1249 1250
1250 1251 When labeling output for a specific command, a label of
1251 1252 "cmdname.type" is recommended. For example, status issues
1252 1253 a label of "status.modified" for modified files.
1253 1254 """
1254 1255 dest = self._fout
1255 1256
1256 1257 # inlined _write() for speed
1257 1258 if self._buffers:
1258 1259 label = opts.get('label', b'')
1259 1260 if label and self._bufferapplylabels:
1260 1261 self._buffers[-1].extend(self.label(a, label) for a in args)
1261 1262 else:
1262 1263 self._buffers[-1].extend(args)
1263 1264 return
1264 1265
1265 1266 # inlined _writenobuf() for speed
1266 1267 if not opts.get('keepprogressbar', False):
1267 1268 self._progclear()
1268 1269 msg = b''.join(args)
1269 1270
1270 1271 # opencode timeblockedsection because this is a critical path
1271 1272 starttime = util.timer()
1272 1273 try:
1273 1274 if self._colormode == b'win32':
1274 1275 # windows color printing is its own can of crab, defer to
1275 1276 # the color module and that is it.
1276 1277 color.win32print(self, dest.write, msg, **opts)
1277 1278 else:
1278 1279 if self._colormode is not None:
1279 1280 label = opts.get('label', b'')
1280 1281 msg = self.label(msg, label)
1281 1282 dest.write(msg)
1282 1283 except IOError as err:
1283 1284 raise error.StdioError(err)
1284 1285 finally:
1285 1286 self._blockedtimes[b'stdio_blocked'] += (
1286 1287 util.timer() - starttime
1287 1288 ) * 1000
1288 1289
1289 1290 def write_err(self, *args: bytes, **opts: _MsgOpts) -> None:
1290 1291 self._write(self._ferr, *args, **opts)
1291 1292
1292 1293 def _write(self, dest, *args: bytes, **opts: _MsgOpts) -> None:
1293 1294 # update write() as well if you touch this code
1294 1295 if self._isbuffered(dest):
1295 1296 label = opts.get('label', b'')
1296 1297 if label and self._bufferapplylabels:
1297 1298 self._buffers[-1].extend(self.label(a, label) for a in args)
1298 1299 else:
1299 1300 self._buffers[-1].extend(args)
1300 1301 else:
1301 1302 self._writenobuf(dest, *args, **opts)
1302 1303
1303 1304 def _writenobuf(self, dest, *args: bytes, **opts: _MsgOpts) -> None:
1304 1305 # update write() as well if you touch this code
1305 1306 if not opts.get('keepprogressbar', False):
1306 1307 self._progclear()
1307 1308 msg = b''.join(args)
1308 1309
1309 1310 # opencode timeblockedsection because this is a critical path
1310 1311 starttime = util.timer()
1311 1312 try:
1312 1313 if dest is self._ferr and not getattr(self._fout, 'closed', False):
1313 1314 self._fout.flush()
1314 1315 if getattr(dest, 'structured', False):
1315 1316 # channel for machine-readable output with metadata, where
1316 1317 # no extra colorization is necessary.
1317 1318 dest.write(msg, **opts)
1318 1319 elif self._colormode == b'win32':
1319 1320 # windows color printing is its own can of crab, defer to
1320 1321 # the color module and that is it.
1321 1322 color.win32print(self, dest.write, msg, **opts)
1322 1323 else:
1323 1324 if self._colormode is not None:
1324 1325 label = opts.get('label', b'')
1325 1326 msg = self.label(msg, label)
1326 1327 dest.write(msg)
1327 1328 # stderr may be buffered under win32 when redirected to files,
1328 1329 # including stdout.
1329 1330 if dest is self._ferr and not getattr(dest, 'closed', False):
1330 1331 dest.flush()
1331 1332 except IOError as err:
1332 1333 if dest is self._ferr and err.errno in (
1333 1334 errno.EPIPE,
1334 1335 errno.EIO,
1335 1336 errno.EBADF,
1336 1337 ):
1337 1338 # no way to report the error, so ignore it
1338 1339 return
1339 1340 raise error.StdioError(err)
1340 1341 finally:
1341 1342 self._blockedtimes[b'stdio_blocked'] += (
1342 1343 util.timer() - starttime
1343 1344 ) * 1000
1344 1345
1345 1346 def _writemsg(self, dest, *args: bytes, **opts: _MsgOpts) -> None:
1346 1347 timestamp = self.showtimestamp and opts.get('type') in {
1347 1348 b'debug',
1348 1349 b'error',
1349 1350 b'note',
1350 1351 b'status',
1351 1352 b'warning',
1352 1353 }
1353 1354 if timestamp:
1354 1355 args = (
1355 1356 b'[%s] '
1356 1357 % pycompat.bytestr(datetime.datetime.now().isoformat()),
1357 1358 ) + args
1358 1359 _writemsgwith(self._write, dest, *args, **opts)
1359 1360 if timestamp:
1360 1361 dest.flush()
1361 1362
1362 1363 def _writemsgnobuf(self, dest, *args: bytes, **opts: _MsgOpts) -> None:
1363 1364 _writemsgwith(self._writenobuf, dest, *args, **opts)
1364 1365
1365 1366 def flush(self) -> None:
1366 1367 # opencode timeblockedsection because this is a critical path
1367 1368 starttime = util.timer()
1368 1369 try:
1369 1370 try:
1370 1371 self._fout.flush()
1371 1372 except IOError as err:
1372 1373 if err.errno not in (errno.EPIPE, errno.EIO, errno.EBADF):
1373 1374 raise error.StdioError(err)
1374 1375 finally:
1375 1376 try:
1376 1377 self._ferr.flush()
1377 1378 except IOError as err:
1378 1379 if err.errno not in (errno.EPIPE, errno.EIO, errno.EBADF):
1379 1380 raise error.StdioError(err)
1380 1381 finally:
1381 1382 self._blockedtimes[b'stdio_blocked'] += (
1382 1383 util.timer() - starttime
1383 1384 ) * 1000
1384 1385
1385 1386 def _isatty(self, fh) -> bool:
1386 1387 if self.configbool(b'ui', b'nontty'):
1387 1388 return False
1388 1389 return procutil.isatty(fh)
1389 1390
1390 1391 def protectfinout(self):
1391 1392 """Duplicate ui streams and redirect original if they are stdio
1392 1393
1393 1394 Returns (fin, fout) which point to the original ui fds, but may be
1394 1395 copy of them. The returned streams can be considered "owned" in that
1395 1396 print(), exec(), etc. never reach to them.
1396 1397 """
1397 1398 if self._finoutredirected:
1398 1399 # if already redirected, protectstdio() would just create another
1399 1400 # nullfd pair, which is equivalent to returning self._fin/_fout.
1400 1401 return self._fin, self._fout
1401 1402 fin, fout = procutil.protectstdio(self._fin, self._fout)
1402 1403 self._finoutredirected = (fin, fout) != (self._fin, self._fout)
1403 1404 return fin, fout
1404 1405
1405 1406 def restorefinout(self, fin, fout):
1406 1407 """Restore ui streams from possibly duplicated (fin, fout)"""
1407 1408 if (fin, fout) == (self._fin, self._fout):
1408 1409 return
1409 1410 procutil.restorestdio(self._fin, self._fout, fin, fout)
1410 1411 # protectfinout() won't create more than one duplicated streams,
1411 1412 # so we can just turn the redirection flag off.
1412 1413 self._finoutredirected = False
1413 1414
1414 1415 @contextlib.contextmanager
1415 1416 def protectedfinout(self):
1416 1417 """Run code block with protected standard streams"""
1417 1418 fin, fout = self.protectfinout()
1418 1419 try:
1419 1420 yield fin, fout
1420 1421 finally:
1421 1422 self.restorefinout(fin, fout)
1422 1423
1423 1424 def disablepager(self) -> None:
1424 1425 self._disablepager = True
1425 1426
1426 1427 def pager(self, command: bytes) -> None:
1427 1428 """Start a pager for subsequent command output.
1428 1429
1429 1430 Commands which produce a long stream of output should call
1430 1431 this function to activate the user's preferred pagination
1431 1432 mechanism (which may be no pager). Calling this function
1432 1433 precludes any future use of interactive functionality, such as
1433 1434 prompting the user or activating curses.
1434 1435
1435 1436 Args:
1436 1437 command: The full, non-aliased name of the command. That is, "log"
1437 1438 not "history, "summary" not "summ", etc.
1438 1439 """
1439 1440 if self._disablepager or self.pageractive:
1440 1441 # how pager should do is already determined
1441 1442 return
1442 1443
1443 1444 if not command.startswith(b'internal-always-') and (
1444 1445 # explicit --pager=on (= 'internal-always-' prefix) should
1445 1446 # take precedence over disabling factors below
1446 1447 command in self.configlist(b'pager', b'ignore')
1447 1448 or not self.configbool(b'ui', b'paginate')
1448 1449 or not self.configbool(b'pager', b'attend-' + command, True)
1449 1450 or encoding.environ.get(b'TERM') == b'dumb'
1450 1451 # TODO: if we want to allow HGPLAINEXCEPT=pager,
1451 1452 # formatted() will need some adjustment.
1452 1453 or not self.formatted()
1453 1454 or self.plain()
1454 1455 or self._buffers
1455 1456 # TODO: expose debugger-enabled on the UI object
1456 1457 or b'--debugger' in pycompat.sysargv
1457 1458 ):
1458 1459 # We only want to paginate if the ui appears to be
1459 1460 # interactive, the user didn't say HGPLAIN or
1460 1461 # HGPLAINEXCEPT=pager, and the user didn't specify --debug.
1461 1462 return
1462 1463
1463 1464 # py2exe doesn't appear to be able to use legacy I/O, and nothing is
1464 1465 # output to the pager for paged commands. Piping to `more` in cmd.exe
1465 1466 # works, but is easy to forget. Just disable pager for py2exe, but
1466 1467 # leave it working for pyoxidizer and exewrapper builds.
1467 1468 if pycompat.iswindows and getattr(sys, "frozen", None) == "console_exe":
1468 1469 self.debug(b"pager is unavailable with py2exe packaging\n")
1469 1470 return
1470 1471
1471 1472 pagercmd = self.config(b'pager', b'pager', rcutil.fallbackpager)
1472 1473 if not pagercmd:
1473 1474 return
1474 1475
1475 1476 pagerenv = {}
1476 1477 for name, value in rcutil.defaultpagerenv().items():
1477 1478 if name not in encoding.environ:
1478 1479 pagerenv[name] = value
1479 1480
1480 1481 self.debug(
1481 1482 b'starting pager for command %s\n' % stringutil.pprint(command)
1482 1483 )
1483 1484 self.flush()
1484 1485
1485 1486 wasformatted = self.formatted()
1486 1487 if util.safehasattr(signal, b"SIGPIPE"):
1487 1488 signal.signal(signal.SIGPIPE, _catchterm)
1488 1489 if self._runpager(pagercmd, pagerenv):
1489 1490 self.pageractive = True
1490 1491 # Preserve the formatted-ness of the UI. This is important
1491 1492 # because we mess with stdout, which might confuse
1492 1493 # auto-detection of things being formatted.
1493 1494 self.setconfig(b'ui', b'formatted', wasformatted, b'pager')
1494 1495 self.setconfig(b'ui', b'interactive', False, b'pager')
1495 1496
1496 1497 # If pagermode differs from color.mode, reconfigure color now that
1497 1498 # pageractive is set.
1498 1499 cm = self._colormode
1499 1500 if cm != self.config(b'color', b'pagermode', cm):
1500 1501 color.setup(self)
1501 1502 else:
1502 1503 # If the pager can't be spawned in dispatch when --pager=on is
1503 1504 # given, don't try again when the command runs, to avoid a duplicate
1504 1505 # warning about a missing pager command.
1505 1506 self.disablepager()
1506 1507
1507 1508 def _runpager(self, command: bytes, env=None) -> bool:
1508 1509 """Actually start the pager and set up file descriptors.
1509 1510
1510 1511 This is separate in part so that extensions (like chg) can
1511 1512 override how a pager is invoked.
1512 1513 """
1513 1514 if command == b'cat':
1514 1515 # Save ourselves some work.
1515 1516 return False
1516 1517 # If the command doesn't contain any of these characters, we
1517 1518 # assume it's a binary and exec it directly. This means for
1518 1519 # simple pager command configurations, we can degrade
1519 1520 # gracefully and tell the user about their broken pager.
1520 1521 shell = any(c in command for c in b"|&;<>()$`\\\"' \t\n*?[#~=%")
1521 1522
1522 1523 if pycompat.iswindows and not shell:
1523 1524 # Window's built-in `more` cannot be invoked with shell=False, but
1524 1525 # its `more.com` can. Hide this implementation detail from the
1525 1526 # user so we can also get sane bad PAGER behavior. MSYS has
1526 1527 # `more.exe`, so do a cmd.exe style resolution of the executable to
1527 1528 # determine which one to use.
1528 1529 fullcmd = procutil.findexe(command)
1529 1530 if not fullcmd:
1530 1531 self.warn(
1531 1532 _(b"missing pager command '%s', skipping pager\n") % command
1532 1533 )
1533 1534 return False
1534 1535
1535 1536 command = fullcmd
1536 1537
1537 1538 try:
1538 1539 pager = subprocess.Popen(
1539 1540 procutil.tonativestr(command),
1540 1541 shell=shell,
1541 1542 bufsize=-1,
1542 1543 close_fds=procutil.closefds,
1543 1544 stdin=subprocess.PIPE,
1544 1545 stdout=procutil.stdout,
1545 1546 stderr=procutil.stderr,
1546 1547 env=procutil.tonativeenv(procutil.shellenviron(env)),
1547 1548 )
1548 1549 except FileNotFoundError:
1549 1550 if not shell:
1550 1551 self.warn(
1551 1552 _(b"missing pager command '%s', skipping pager\n") % command
1552 1553 )
1553 1554 return False
1554 1555 raise
1555 1556
1556 1557 # back up original file descriptors
1557 1558 stdoutfd = os.dup(procutil.stdout.fileno())
1558 1559 stderrfd = os.dup(procutil.stderr.fileno())
1559 1560
1560 1561 os.dup2(pager.stdin.fileno(), procutil.stdout.fileno())
1561 1562 if self._isatty(procutil.stderr):
1562 1563 os.dup2(pager.stdin.fileno(), procutil.stderr.fileno())
1563 1564
1564 1565 @self.atexit
1565 1566 def killpager():
1566 1567 if util.safehasattr(signal, b"SIGINT"):
1567 1568 signal.signal(signal.SIGINT, signal.SIG_IGN)
1568 1569 # restore original fds, closing pager.stdin copies in the process
1569 1570 os.dup2(stdoutfd, procutil.stdout.fileno())
1570 1571 os.dup2(stderrfd, procutil.stderr.fileno())
1571 1572 pager.stdin.close()
1572 1573 pager.wait()
1573 1574
1574 1575 return True
1575 1576
1576 1577 @property
1577 1578 def _exithandlers(self):
1578 1579 return _reqexithandlers
1579 1580
1580 1581 def atexit(self, func, *args, **kwargs):
1581 1582 """register a function to run after dispatching a request
1582 1583
1583 1584 Handlers do not stay registered across request boundaries."""
1584 1585 self._exithandlers.append((func, args, kwargs))
1585 1586 return func
1586 1587
1587 1588 def interface(self, feature: bytes) -> bytes:
1588 1589 """what interface to use for interactive console features?
1589 1590
1590 1591 The interface is controlled by the value of `ui.interface` but also by
1591 1592 the value of feature-specific configuration. For example:
1592 1593
1593 1594 ui.interface.histedit = text
1594 1595 ui.interface.chunkselector = curses
1595 1596
1596 1597 Here the features are "histedit" and "chunkselector".
1597 1598
1598 1599 The configuration above means that the default interfaces for commands
1599 1600 is curses, the interface for histedit is text and the interface for
1600 1601 selecting chunk is crecord (the best curses interface available).
1601 1602
1602 1603 Consider the following example:
1603 1604 ui.interface = curses
1604 1605 ui.interface.histedit = text
1605 1606
1606 1607 Then histedit will use the text interface and chunkselector will use
1607 1608 the default curses interface (crecord at the moment).
1608 1609 """
1609 1610 alldefaults = frozenset([b"text", b"curses"])
1610 1611
1611 1612 featureinterfaces = {
1612 1613 b"chunkselector": [
1613 1614 b"text",
1614 1615 b"curses",
1615 1616 ],
1616 1617 b"histedit": [
1617 1618 b"text",
1618 1619 b"curses",
1619 1620 ],
1620 1621 }
1621 1622
1622 1623 # Feature-specific interface
1623 1624 if feature not in featureinterfaces.keys():
1624 1625 # Programming error, not user error
1625 1626 raise ValueError(b"Unknown feature requested %s" % feature)
1626 1627
1627 1628 availableinterfaces = frozenset(featureinterfaces[feature])
1628 1629 if alldefaults > availableinterfaces:
1629 1630 # Programming error, not user error. We need a use case to
1630 1631 # define the right thing to do here.
1631 1632 raise ValueError(
1632 1633 b"Feature %s does not handle all default interfaces" % feature
1633 1634 )
1634 1635
1635 1636 if self.plain() or encoding.environ.get(b'TERM') == b'dumb':
1636 1637 return b"text"
1637 1638
1638 1639 # Default interface for all the features
1639 1640 defaultinterface = b"text"
1640 1641 i = self.config(b"ui", b"interface")
1641 1642 if i in alldefaults:
1642 1643 defaultinterface = cast(bytes, i) # cast to help pytype
1643 1644
1644 1645 choseninterface: bytes = defaultinterface
1645 1646 f = self.config(b"ui", b"interface.%s" % feature)
1646 1647 if f in availableinterfaces:
1647 1648 choseninterface = cast(bytes, f) # cast to help pytype
1648 1649
1649 1650 if i is not None and defaultinterface != i:
1650 1651 if f is not None:
1651 1652 self.warn(_(b"invalid value for ui.interface: %s\n") % (i,))
1652 1653 else:
1653 1654 self.warn(
1654 1655 _(b"invalid value for ui.interface: %s (using %s)\n")
1655 1656 % (i, choseninterface)
1656 1657 )
1657 1658 if f is not None and choseninterface != f:
1658 1659 self.warn(
1659 1660 _(b"invalid value for ui.interface.%s: %s (using %s)\n")
1660 1661 % (feature, f, choseninterface)
1661 1662 )
1662 1663
1663 1664 return choseninterface
1664 1665
1665 1666 def interactive(self):
1666 1667 """is interactive input allowed?
1667 1668
1668 1669 An interactive session is a session where input can be reasonably read
1669 1670 from `sys.stdin'. If this function returns false, any attempt to read
1670 1671 from stdin should fail with an error, unless a sensible default has been
1671 1672 specified.
1672 1673
1673 1674 Interactiveness is triggered by the value of the `ui.interactive'
1674 1675 configuration variable or - if it is unset - when `sys.stdin' points
1675 1676 to a terminal device.
1676 1677
1677 1678 This function refers to input only; for output, see `ui.formatted()'.
1678 1679 """
1679 1680 i = self.configbool(b"ui", b"interactive")
1680 1681 if i is None:
1681 1682 # some environments replace stdin without implementing isatty
1682 1683 # usually those are non-interactive
1683 1684 return self._isatty(self._fin)
1684 1685
1685 1686 return i
1686 1687
1687 1688 def termwidth(self) -> int:
1688 1689 """how wide is the terminal in columns?"""
1689 1690 if b'COLUMNS' in encoding.environ:
1690 1691 try:
1691 1692 return int(encoding.environ[b'COLUMNS'])
1692 1693 except ValueError:
1693 1694 pass
1694 1695 return scmutil.termsize(self)[0]
1695 1696
1696 1697 def formatted(self):
1697 1698 """should formatted output be used?
1698 1699
1699 1700 It is often desirable to format the output to suite the output medium.
1700 1701 Examples of this are truncating long lines or colorizing messages.
1701 1702 However, this is not often not desirable when piping output into other
1702 1703 utilities, e.g. `grep'.
1703 1704
1704 1705 Formatted output is triggered by the value of the `ui.formatted'
1705 1706 configuration variable or - if it is unset - when `sys.stdout' points
1706 1707 to a terminal device. Please note that `ui.formatted' should be
1707 1708 considered an implementation detail; it is not intended for use outside
1708 1709 Mercurial or its extensions.
1709 1710
1710 1711 This function refers to output only; for input, see `ui.interactive()'.
1711 1712 This function always returns false when in plain mode, see `ui.plain()'.
1712 1713 """
1713 1714 if self.plain():
1714 1715 return False
1715 1716
1716 1717 i = self.configbool(b"ui", b"formatted")
1717 1718 if i is None:
1718 1719 # some environments replace stdout without implementing isatty
1719 1720 # usually those are non-interactive
1720 1721 return self._isatty(self._fout)
1721 1722
1722 1723 return i
1723 1724
1724 1725 def _readline(
1725 1726 self,
1726 1727 prompt: bytes = b' ',
1727 1728 promptopts: Optional[Dict[str, _MsgOpts]] = None,
1728 1729 ) -> bytes:
1729 1730 # Replacing stdin/stdout temporarily is a hard problem on Python 3
1730 1731 # because they have to be text streams with *no buffering*. Instead,
1731 1732 # we use rawinput() only if call_readline() will be invoked by
1732 1733 # PyOS_Readline(), so no I/O will be made at Python layer.
1733 1734 usereadline = (
1734 1735 self._isatty(self._fin)
1735 1736 and self._isatty(self._fout)
1736 1737 and procutil.isstdin(self._fin)
1737 1738 and procutil.isstdout(self._fout)
1738 1739 )
1739 1740 if usereadline:
1740 1741 try:
1741 1742 # magically add command line editing support, where
1742 1743 # available
1743 1744 import readline
1744 1745
1745 1746 # force demandimport to really load the module
1746 1747 readline.read_history_file
1747 1748 # windows sometimes raises something other than ImportError
1748 1749 except Exception:
1749 1750 usereadline = False
1750 1751
1751 1752 if self._colormode == b'win32' or not usereadline:
1752 1753 if not promptopts:
1753 1754 promptopts = {}
1754 1755 self._writemsgnobuf(
1755 1756 self._fmsgout, prompt, type=b'prompt', **promptopts
1756 1757 )
1757 1758 self.flush()
1758 1759 prompt = b' '
1759 1760 else:
1760 1761 prompt = self.label(prompt, b'ui.prompt') + b' '
1761 1762
1762 1763 # prompt ' ' must exist; otherwise readline may delete entire line
1763 1764 # - http://bugs.python.org/issue12833
1764 1765 with self.timeblockedsection(b'stdio'):
1765 1766 if usereadline:
1766 1767 self.flush()
1767 1768 prompt = encoding.strfromlocal(prompt)
1768 1769 line = encoding.strtolocal(input(prompt))
1769 1770 # When stdin is in binary mode on Windows, it can cause
1770 1771 # input() to emit an extra trailing carriage return
1771 1772 if pycompat.oslinesep == b'\r\n' and line.endswith(b'\r'):
1772 1773 line = line[:-1]
1773 1774 else:
1774 1775 self._fout.write(pycompat.bytestr(prompt))
1775 1776 self._fout.flush()
1776 1777 line = self._fin.readline()
1777 1778 if not line:
1778 1779 raise EOFError
1779 1780 line = line.rstrip(pycompat.oslinesep)
1780 1781
1781 1782 return line
1782 1783
1784 if pycompat.TYPE_CHECKING:
1785
1786 @overload
1787 def prompt(self, msg: bytes, default: bytes) -> bytes:
1788 pass
1789
1790 @overload
1791 def prompt(self, msg: bytes, default: None) -> Optional[bytes]:
1792 pass
1793
1783 1794 def prompt(self, msg, default=b"y"):
1784 1795 """Prompt user with msg, read response.
1785 1796 If ui is not interactive, the default is returned.
1786 1797 """
1787 1798 return self._prompt(msg, default=default)
1788 1799
1800 if pycompat.TYPE_CHECKING:
1801
1802 @overload
1803 def _prompt(
1804 self, msg: bytes, default: bytes, **opts: _MsgOpts
1805 ) -> bytes:
1806 pass
1807
1808 @overload
1809 def _prompt(
1810 self, msg: bytes, default: None, **opts: _MsgOpts
1811 ) -> Optional[bytes]:
1812 pass
1813
1789 1814 def _prompt(self, msg, default=b'y', **opts):
1790 1815 opts = {**opts, 'default': default}
1791 1816 if not self.interactive():
1792 1817 self._writemsg(self._fmsgout, msg, b' ', type=b'prompt', **opts)
1793 1818 self._writemsg(
1794 1819 self._fmsgout, default or b'', b"\n", type=b'promptecho'
1795 1820 )
1796 1821 return default
1797 1822 try:
1798 1823 r = self._readline(prompt=msg, promptopts=opts)
1799 1824 if not r:
1800 1825 r = default
1801 1826 if self.configbool(b'ui', b'promptecho'):
1802 1827 self._writemsg(
1803 1828 self._fmsgout, r or b'', b"\n", type=b'promptecho'
1804 1829 )
1805 1830 return r
1806 1831 except EOFError:
1807 1832 raise error.ResponseExpected()
1808 1833
1809 1834 @staticmethod
1810 1835 def extractchoices(prompt: bytes) -> Tuple[bytes, List[_PromptChoice]]:
1811 1836 """Extract prompt message and list of choices from specified prompt.
1812 1837
1813 1838 This returns tuple "(message, choices)", and "choices" is the
1814 1839 list of tuple "(response character, text without &)".
1815 1840
1816 1841 >>> ui.extractchoices(b"awake? $$ &Yes $$ &No")
1817 1842 ('awake? ', [('y', 'Yes'), ('n', 'No')])
1818 1843 >>> ui.extractchoices(b"line\\nbreak? $$ &Yes $$ &No")
1819 1844 ('line\\nbreak? ', [('y', 'Yes'), ('n', 'No')])
1820 1845 >>> ui.extractchoices(b"want lots of $$money$$?$$Ye&s$$N&o")
1821 1846 ('want lots of $$money$$?', [('s', 'Yes'), ('o', 'No')])
1822 1847 """
1823 1848
1824 1849 # Sadly, the prompt string may have been built with a filename
1825 1850 # containing "$$" so let's try to find the first valid-looking
1826 1851 # prompt to start parsing. Sadly, we also can't rely on
1827 1852 # choices containing spaces, ASCII, or basically anything
1828 1853 # except an ampersand followed by a character.
1829 1854 m = re.match(br'(?s)(.+?)\$\$([^$]*&[^ $].*)', prompt)
1830 1855
1831 1856 assert m is not None # help pytype
1832 1857
1833 1858 msg = m.group(1)
1834 1859 choices = [p.strip(b' ') for p in m.group(2).split(b'$$')]
1835 1860
1836 1861 def choicetuple(s):
1837 1862 ampidx = s.index(b'&')
1838 1863 return s[ampidx + 1 : ampidx + 2].lower(), s.replace(b'&', b'', 1)
1839 1864
1840 1865 return (msg, [choicetuple(s) for s in choices])
1841 1866
1842 1867 def promptchoice(self, prompt: bytes, default: int = 0) -> int:
1843 1868 """Prompt user with a message, read response, and ensure it matches
1844 1869 one of the provided choices. The prompt is formatted as follows:
1845 1870
1846 1871 "would you like fries with that (Yn)? $$ &Yes $$ &No"
1847 1872
1848 1873 The index of the choice is returned. Responses are case
1849 1874 insensitive. If ui is not interactive, the default is
1850 1875 returned.
1851 1876 """
1852 1877
1853 1878 msg, choices = self.extractchoices(prompt)
1854 1879 resps = [r for r, t in choices]
1855 1880 while True:
1856 1881 r = self._prompt(msg, default=resps[default], choices=choices)
1857 1882 if r.lower() in resps:
1858 1883 return resps.index(r.lower())
1859 1884 # TODO: shouldn't it be a warning?
1860 1885 self._writemsg(self._fmsgout, _(b"unrecognized response\n"))
1861 1886
1862 1887 def getpass(
1863 1888 self, prompt: Optional[bytes] = None, default: Optional[bytes] = None
1864 1889 ) -> Optional[bytes]:
1865 1890 if not self.interactive():
1866 1891 return default
1867 1892 try:
1868 1893 self._writemsg(
1869 1894 self._fmsgerr,
1870 1895 prompt or _(b'password: '),
1871 1896 type=b'prompt',
1872 1897 password=True,
1873 1898 )
1874 1899 # disable getpass() only if explicitly specified. it's still valid
1875 1900 # to interact with tty even if fin is not a tty.
1876 1901 with self.timeblockedsection(b'stdio'):
1877 1902 if self.configbool(b'ui', b'nontty'):
1878 1903 l = self._fin.readline()
1879 1904 if not l:
1880 1905 raise EOFError
1881 1906 return l.rstrip(b'\n')
1882 1907 else:
1883 1908 return util.get_password()
1884 1909 except EOFError:
1885 1910 raise error.ResponseExpected()
1886 1911
1887 1912 def status(self, *msg: bytes, **opts: _MsgOpts) -> None:
1888 1913 """write status message to output (if ui.quiet is False)
1889 1914
1890 1915 This adds an output label of "ui.status".
1891 1916 """
1892 1917 if not self.quiet:
1893 1918 self._writemsg(self._fmsgout, type=b'status', *msg, **opts)
1894 1919
1895 1920 def warn(self, *msg: bytes, **opts: _MsgOpts) -> None:
1896 1921 """write warning message to output (stderr)
1897 1922
1898 1923 This adds an output label of "ui.warning".
1899 1924 """
1900 1925 self._writemsg(self._fmsgerr, type=b'warning', *msg, **opts)
1901 1926
1902 1927 def error(self, *msg: bytes, **opts: _MsgOpts) -> None:
1903 1928 """write error message to output (stderr)
1904 1929
1905 1930 This adds an output label of "ui.error".
1906 1931 """
1907 1932 self._writemsg(self._fmsgerr, type=b'error', *msg, **opts)
1908 1933
1909 1934 def note(self, *msg: bytes, **opts: _MsgOpts) -> None:
1910 1935 """write note to output (if ui.verbose is True)
1911 1936
1912 1937 This adds an output label of "ui.note".
1913 1938 """
1914 1939 if self.verbose:
1915 1940 self._writemsg(self._fmsgout, type=b'note', *msg, **opts)
1916 1941
1917 1942 def debug(self, *msg: bytes, **opts: _MsgOpts) -> None:
1918 1943 """write debug message to output (if ui.debugflag is True)
1919 1944
1920 1945 This adds an output label of "ui.debug".
1921 1946 """
1922 1947 if self.debugflag:
1923 1948 self._writemsg(self._fmsgout, type=b'debug', *msg, **opts)
1924 1949 self.log(b'debug', b'%s', b''.join(msg))
1925 1950
1926 1951 # Aliases to defeat check-code.
1927 1952 statusnoi18n = status
1928 1953 notenoi18n = note
1929 1954 warnnoi18n = warn
1930 1955 writenoi18n = write
1931 1956
1932 1957 def edit(
1933 1958 self,
1934 1959 text: bytes,
1935 1960 user: bytes,
1936 1961 extra: Optional[Dict[bytes, Any]] = None, # TODO: value type of bytes?
1937 1962 editform=None,
1938 1963 pending=None,
1939 1964 repopath: Optional[bytes] = None,
1940 1965 action: Optional[bytes] = None,
1941 1966 ) -> bytes:
1942 1967 if action is None:
1943 1968 self.develwarn(
1944 1969 b'action is None but will soon be a required '
1945 1970 b'parameter to ui.edit()'
1946 1971 )
1947 1972 extra_defaults = {
1948 1973 b'prefix': b'editor',
1949 1974 b'suffix': b'.txt',
1950 1975 }
1951 1976 if extra is not None:
1952 1977 if extra.get(b'suffix') is not None:
1953 1978 self.develwarn(
1954 1979 b'extra.suffix is not None but will soon be '
1955 1980 b'ignored by ui.edit()'
1956 1981 )
1957 1982 extra_defaults.update(extra)
1958 1983 extra = extra_defaults
1959 1984
1960 1985 if action == b'diff':
1961 1986 suffix = b'.diff'
1962 1987 elif action:
1963 1988 suffix = b'.%s.hg.txt' % action
1964 1989 else:
1965 1990 suffix = extra[b'suffix']
1966 1991
1967 1992 rdir = None
1968 1993 if self.configbool(b'experimental', b'editortmpinhg'):
1969 1994 rdir = repopath
1970 1995 (fd, name) = pycompat.mkstemp(
1971 1996 prefix=b'hg-' + extra[b'prefix'] + b'-', suffix=suffix, dir=rdir
1972 1997 )
1973 1998 try:
1974 1999 with os.fdopen(fd, 'wb') as f:
1975 2000 f.write(util.tonativeeol(text))
1976 2001
1977 2002 environ = {b'HGUSER': user}
1978 2003 if b'transplant_source' in extra:
1979 2004 environ.update(
1980 2005 {b'HGREVISION': hex(extra[b'transplant_source'])}
1981 2006 )
1982 2007 for label in (b'intermediate-source', b'source', b'rebase_source'):
1983 2008 if label in extra:
1984 2009 environ.update({b'HGREVISION': extra[label]})
1985 2010 break
1986 2011 if editform:
1987 2012 environ.update({b'HGEDITFORM': editform})
1988 2013 if pending:
1989 2014 environ.update({b'HG_PENDING': pending})
1990 2015
1991 2016 editor = self.geteditor()
1992 2017
1993 2018 self.system(
1994 2019 b"%s \"%s\"" % (editor, name),
1995 2020 environ=environ,
1996 2021 onerr=error.CanceledError,
1997 2022 errprefix=_(b"edit failed"),
1998 2023 blockedtag=b'editor',
1999 2024 )
2000 2025
2001 2026 with open(name, 'rb') as f:
2002 2027 t = util.fromnativeeol(f.read())
2003 2028 finally:
2004 2029 os.unlink(name)
2005 2030
2006 2031 return t
2007 2032
2008 2033 def system(
2009 2034 self,
2010 2035 cmd: bytes,
2011 2036 environ=None,
2012 2037 cwd: Optional[bytes] = None,
2013 2038 onerr: Optional[Callable[[bytes], Exception]] = None,
2014 2039 errprefix: Optional[bytes] = None,
2015 2040 blockedtag: Optional[bytes] = None,
2016 2041 ) -> int:
2017 2042 """execute shell command with appropriate output stream. command
2018 2043 output will be redirected if fout is not stdout.
2019 2044
2020 2045 if command fails and onerr is None, return status, else raise onerr
2021 2046 object as exception.
2022 2047 """
2023 2048 if blockedtag is None:
2024 2049 # Long cmds tend to be because of an absolute path on cmd. Keep
2025 2050 # the tail end instead
2026 2051 cmdsuffix = cmd.translate(None, _keepalnum)[-85:]
2027 2052 blockedtag = b'unknown_system_' + cmdsuffix
2028 2053 out = self._fout
2029 2054 if any(s[1] for s in self._bufferstates):
2030 2055 out = self
2031 2056 with self.timeblockedsection(blockedtag):
2032 2057 rc = self._runsystem(cmd, environ=environ, cwd=cwd, out=out)
2033 2058 if rc and onerr:
2034 2059 errmsg = b'%s %s' % (
2035 2060 procutil.shellsplit(cmd)[0],
2036 2061 procutil.explainexit(rc),
2037 2062 )
2038 2063 if errprefix:
2039 2064 errmsg = b'%s: %s' % (errprefix, errmsg)
2040 2065 raise onerr(errmsg)
2041 2066 return rc
2042 2067
2043 2068 def _runsystem(self, cmd: bytes, environ, cwd: Optional[bytes], out) -> int:
2044 2069 """actually execute the given shell command (can be overridden by
2045 2070 extensions like chg)"""
2046 2071 return procutil.system(cmd, environ=environ, cwd=cwd, out=out)
2047 2072
2048 2073 def traceback(self, exc=None, force: bool = False):
2049 2074 """print exception traceback if traceback printing enabled or forced.
2050 2075 only to call in exception handler. returns true if traceback
2051 2076 printed."""
2052 2077 if self.tracebackflag or force:
2053 2078 if exc is None:
2054 2079 exc = sys.exc_info()
2055 2080 cause = getattr(exc[1], 'cause', None)
2056 2081
2057 2082 if cause is not None:
2058 2083 causetb = traceback.format_tb(cause[2])
2059 2084 exctb = traceback.format_tb(exc[2])
2060 2085 exconly = traceback.format_exception_only(cause[0], cause[1])
2061 2086
2062 2087 # exclude frame where 'exc' was chained and rethrown from exctb
2063 2088 self.write_err(
2064 2089 b'Traceback (most recent call last):\n',
2065 2090 encoding.strtolocal(''.join(exctb[:-1])),
2066 2091 encoding.strtolocal(''.join(causetb)),
2067 2092 encoding.strtolocal(''.join(exconly)),
2068 2093 )
2069 2094 else:
2070 2095 output = traceback.format_exception(exc[0], exc[1], exc[2])
2071 2096 self.write_err(encoding.strtolocal(''.join(output)))
2072 2097 return self.tracebackflag or force
2073 2098
2074 2099 def geteditor(self):
2075 2100 '''return editor to use'''
2076 2101 if pycompat.sysplatform == b'plan9':
2077 2102 # vi is the MIPS instruction simulator on Plan 9. We
2078 2103 # instead default to E to plumb commit messages to
2079 2104 # avoid confusion.
2080 2105 editor = b'E'
2081 2106 elif pycompat.isdarwin:
2082 2107 # vi on darwin is POSIX compatible to a fault, and that includes
2083 2108 # exiting non-zero if you make any mistake when running an ex
2084 2109 # command. Proof: `vi -c ':unknown' -c ':qa'; echo $?` produces 1,
2085 2110 # while s/vi/vim/ doesn't.
2086 2111 editor = b'vim'
2087 2112 else:
2088 2113 editor = b'vi'
2089 2114 return encoding.environ.get(b"HGEDITOR") or self.config(
2090 2115 b"ui", b"editor", editor
2091 2116 )
2092 2117
2093 2118 @util.propertycache
2094 2119 def _progbar(self) -> Optional[progress.progbar]:
2095 2120 """setup the progbar singleton to the ui object"""
2096 2121 if (
2097 2122 self.quiet
2098 2123 or self.debugflag
2099 2124 or self.configbool(b'progress', b'disable')
2100 2125 or not progress.shouldprint(self)
2101 2126 ):
2102 2127 return None
2103 2128 return getprogbar(self)
2104 2129
2105 2130 def _progclear(self) -> None:
2106 2131 """clear progress bar output if any. use it before any output"""
2107 2132 if not haveprogbar(): # nothing loaded yet
2108 2133 return
2109 2134 if self._progbar is not None and self._progbar.printed:
2110 2135 self._progbar.clear()
2111 2136
2112 2137 def makeprogress(
2113 2138 self, topic: bytes, unit: bytes = b"", total: Optional[int] = None
2114 2139 ) -> scmutil.progress:
2115 2140 """Create a progress helper for the specified topic"""
2116 2141 if getattr(self._fmsgerr, 'structured', False):
2117 2142 # channel for machine-readable output with metadata, just send
2118 2143 # raw information
2119 2144 # TODO: consider porting some useful information (e.g. estimated
2120 2145 # time) from progbar. we might want to support update delay to
2121 2146 # reduce the cost of transferring progress messages.
2122 2147 def updatebar(topic, pos, item, unit, total):
2123 2148 self._fmsgerr.write(
2124 2149 None,
2125 2150 type=b'progress',
2126 2151 topic=topic,
2127 2152 pos=pos,
2128 2153 item=item,
2129 2154 unit=unit,
2130 2155 total=total,
2131 2156 )
2132 2157
2133 2158 elif self._progbar is not None:
2134 2159 updatebar = self._progbar.progress
2135 2160 else:
2136 2161
2137 2162 def updatebar(topic, pos, item, unit, total):
2138 2163 pass
2139 2164
2140 2165 return scmutil.progress(self, updatebar, topic, unit, total)
2141 2166
2142 2167 def getlogger(self, name):
2143 2168 """Returns a logger of the given name; or None if not registered"""
2144 2169 return self._loggers.get(name)
2145 2170
2146 2171 def setlogger(self, name, logger) -> None:
2147 2172 """Install logger which can be identified later by the given name
2148 2173
2149 2174 More than one loggers can be registered. Use extension or module
2150 2175 name to uniquely identify the logger instance.
2151 2176 """
2152 2177 self._loggers[name] = logger
2153 2178
2154 2179 def log(self, event, msgfmt, *msgargs, **opts) -> None:
2155 2180 """hook for logging facility extensions
2156 2181
2157 2182 event should be a readily-identifiable subsystem, which will
2158 2183 allow filtering.
2159 2184
2160 2185 msgfmt should be a newline-terminated format string to log, and
2161 2186 *msgargs are %-formatted into it.
2162 2187
2163 2188 **opts currently has no defined meanings.
2164 2189 """
2165 2190 if not self._loggers:
2166 2191 return
2167 2192 activeloggers = [l for l in self._loggers.values() if l.tracked(event)]
2168 2193 if not activeloggers:
2169 2194 return
2170 2195 msg = msgfmt % msgargs
2171 2196 opts = pycompat.byteskwargs(opts)
2172 2197 # guard against recursion from e.g. ui.debug()
2173 2198 registeredloggers = self._loggers
2174 2199 self._loggers = {}
2175 2200 try:
2176 2201 for logger in activeloggers:
2177 2202 logger.log(self, event, msg, opts)
2178 2203 finally:
2179 2204 self._loggers = registeredloggers
2180 2205
2181 2206 def label(self, msg: bytes, label: bytes) -> bytes:
2182 2207 """style msg based on supplied label
2183 2208
2184 2209 If some color mode is enabled, this will add the necessary control
2185 2210 characters to apply such color. In addition, 'debug' color mode adds
2186 2211 markup showing which label affects a piece of text.
2187 2212
2188 2213 ui.write(s, 'label') is equivalent to
2189 2214 ui.write(ui.label(s, 'label')).
2190 2215 """
2191 2216 if self._colormode is not None:
2192 2217 return color.colorlabel(self, msg, label)
2193 2218 return msg
2194 2219
2195 2220 def develwarn(
2196 2221 self, msg: bytes, stacklevel: int = 1, config: Optional[bytes] = None
2197 2222 ) -> None:
2198 2223 """issue a developer warning message
2199 2224
2200 2225 Use 'stacklevel' to report the offender some layers further up in the
2201 2226 stack.
2202 2227 """
2203 2228 if not self.configbool(b'devel', b'all-warnings'):
2204 2229 if config is None or not self.configbool(b'devel', config):
2205 2230 return
2206 2231 msg = b'devel-warn: ' + msg
2207 2232 stacklevel += 1 # get in develwarn
2208 2233 if self.tracebackflag:
2209 2234 util.debugstacktrace(msg, stacklevel, self._ferr, self._fout)
2210 2235 self.log(
2211 2236 b'develwarn',
2212 2237 b'%s at:\n%s'
2213 2238 % (msg, b''.join(util.getstackframes(stacklevel))),
2214 2239 )
2215 2240 else:
2216 2241 curframe = inspect.currentframe()
2217 2242 calframe = inspect.getouterframes(curframe, 2)
2218 2243 fname, lineno, fmsg = calframe[stacklevel][1:4]
2219 2244 fname, fmsg = pycompat.sysbytes(fname), pycompat.sysbytes(fmsg)
2220 2245 self.write_err(b'%s at: %s:%d (%s)\n' % (msg, fname, lineno, fmsg))
2221 2246 self.log(
2222 2247 b'develwarn', b'%s at: %s:%d (%s)\n', msg, fname, lineno, fmsg
2223 2248 )
2224 2249
2225 2250 # avoid cycles
2226 2251 del curframe
2227 2252 del calframe
2228 2253
2229 2254 def deprecwarn(
2230 2255 self, msg: bytes, version: bytes, stacklevel: int = 2
2231 2256 ) -> None:
2232 2257 """issue a deprecation warning
2233 2258
2234 2259 - msg: message explaining what is deprecated and how to upgrade,
2235 2260 - version: last version where the API will be supported,
2236 2261 """
2237 2262 if not (
2238 2263 self.configbool(b'devel', b'all-warnings')
2239 2264 or self.configbool(b'devel', b'deprec-warn')
2240 2265 ):
2241 2266 return
2242 2267 msg += (
2243 2268 b"\n(compatibility will be dropped after Mercurial-%s,"
2244 2269 b" update your code.)"
2245 2270 ) % version
2246 2271 self.develwarn(msg, stacklevel=stacklevel, config=b'deprec-warn')
2247 2272
2248 2273 def exportableenviron(self):
2249 2274 """The environment variables that are safe to export, e.g. through
2250 2275 hgweb.
2251 2276 """
2252 2277 return self._exportableenviron
2253 2278
2254 2279 @contextlib.contextmanager
2255 2280 def configoverride(self, overrides: _ConfigItems, source: bytes = b""):
2256 2281 """Context manager for temporary config overrides
2257 2282 `overrides` must be a dict of the following structure:
2258 2283 {(section, name) : value}"""
2259 2284 backups = {}
2260 2285 try:
2261 2286 for (section, name), value in overrides.items():
2262 2287 backups[(section, name)] = self.backupconfig(section, name)
2263 2288 self.setconfig(section, name, value, source)
2264 2289 yield
2265 2290 finally:
2266 2291 for __, backup in backups.items():
2267 2292 self.restoreconfig(backup)
2268 2293 # just restoring ui.quiet config to the previous value is not enough
2269 2294 # as it does not update ui.quiet class member
2270 2295 if (b'ui', b'quiet') in overrides:
2271 2296 self.fixconfig(section=b'ui')
2272 2297
2273 2298 def estimatememory(self) -> Optional[int]:
2274 2299 """Provide an estimate for the available system memory in Bytes.
2275 2300
2276 2301 This can be overriden via ui.available-memory. It returns None, if
2277 2302 no estimate can be computed.
2278 2303 """
2279 2304 value = self.config(b'ui', b'available-memory')
2280 2305 if value is not None:
2281 2306 try:
2282 2307 return util.sizetoint(value)
2283 2308 except error.ParseError:
2284 2309 raise error.ConfigError(
2285 2310 _(b"ui.available-memory value is invalid ('%s')") % value
2286 2311 )
2287 2312 return util._estimatememory()
2288 2313
2289 2314
2290 2315 # we instantiate one globally shared progress bar to avoid
2291 2316 # competing progress bars when multiple UI objects get created
2292 2317 _progresssingleton: Optional[progress.progbar] = None
2293 2318
2294 2319
2295 2320 def getprogbar(ui: ui) -> progress.progbar:
2296 2321 global _progresssingleton
2297 2322 if _progresssingleton is None:
2298 2323 # passing 'ui' object to the singleton is fishy,
2299 2324 # this is how the extension used to work but feel free to rework it.
2300 2325 _progresssingleton = progress.progbar(ui)
2301 2326 return _progresssingleton
2302 2327
2303 2328
2304 2329 def haveprogbar() -> bool:
2305 2330 return _progresssingleton is not None
2306 2331
2307 2332
2308 2333 def _selectmsgdests(ui: ui):
2309 2334 name = ui.config(b'ui', b'message-output')
2310 2335 if name == b'channel':
2311 2336 if ui.fmsg:
2312 2337 return ui.fmsg, ui.fmsg
2313 2338 else:
2314 2339 # fall back to ferr if channel isn't ready so that status/error
2315 2340 # messages can be printed
2316 2341 return ui.ferr, ui.ferr
2317 2342 if name == b'stdio':
2318 2343 return ui.fout, ui.ferr
2319 2344 if name == b'stderr':
2320 2345 return ui.ferr, ui.ferr
2321 2346 raise error.Abort(b'invalid ui.message-output destination: %s' % name)
2322 2347
2323 2348
2324 2349 def _writemsgwith(write, dest, *args: bytes, **opts: _MsgOpts) -> None:
2325 2350 """Write ui message with the given ui._write*() function
2326 2351
2327 2352 The specified message type is translated to 'ui.<type>' label if the dest
2328 2353 isn't a structured channel, so that the message will be colorized.
2329 2354 """
2330 2355 # TODO: maybe change 'type' to a mandatory option
2331 2356 if 'type' in opts and not getattr(dest, 'structured', False):
2332 2357 opts['label'] = opts.get('label', b'') + b' ui.%s' % opts.pop('type')
2333 2358 write(dest, *args, **opts)
General Comments 0
You need to be logged in to leave comments. Login now