# HG changeset patch # User Pierre-Yves David # Date 2023-12-19 20:29:34 # Node ID f15cb5111a1e4f7eb2e4c42e22ce0aad306884e2 # Parent 81224afd938dba86159ff739ca990fb17e5edece pytype: move some type comment to proper annotation We support direct type annotations now, while pytype is starting to complains about them. diff --git a/mercurial/branchmap.py b/mercurial/branchmap.py --- a/mercurial/branchmap.py +++ b/mercurial/branchmap.py @@ -196,15 +196,16 @@ class branchcache: def __init__( self, - repo, - entries=(), - tipnode=None, - tiprev=nullrev, - filteredhash=None, - closednodes=None, - hasnode=None, - ): - # type: (localrepo.localrepository, Union[Dict[bytes, List[bytes]], Iterable[Tuple[bytes, List[bytes]]]], bytes, int, Optional[bytes], Optional[Set[bytes]], Optional[Callable[[bytes], bool]]) -> None + repo: "localrepo.localrepository", + entries: Union[ + Dict[bytes, List[bytes]], Iterable[Tuple[bytes, List[bytes]]] + ] = (), + tipnode: Optional[bytes] = None, + tiprev: Optional[int] = nullrev, + filteredhash: Optional[bytes] = None, + closednodes: Optional[Set[bytes]] = None, + hasnode: Optional[Callable[[bytes], bool]] = None, + ) -> None: """hasnode is a function which can be used to verify whether changelog has a given node or not. If it's not provided, we assume that every node we have exists in changelog""" diff --git a/mercurial/cmdutil.py b/mercurial/cmdutil.py --- a/mercurial/cmdutil.py +++ b/mercurial/cmdutil.py @@ -4118,8 +4118,10 @@ def abortgraft(ui, repo, graftstate): return 0 -def readgraftstate(repo, graftstate): - # type: (Any, statemod.cmdstate) -> Dict[bytes, Any] +def readgraftstate( + repo: Any, + graftstate: statemod.cmdstate, +) -> Dict[bytes, Any]: """read the graft state file and return a dict of the data stored in it""" try: return graftstate.read() diff --git a/mercurial/encoding.py b/mercurial/encoding.py --- a/mercurial/encoding.py +++ b/mercurial/encoding.py @@ -59,8 +59,7 @@ unichr = chr assert all(i.startswith((b"\xe2", b"\xef")) for i in _ignore) -def hfsignoreclean(s): - # type: (bytes) -> bytes +def hfsignoreclean(s: bytes) -> bytes: """Remove codepoints ignored by HFS+ from s. >>> hfsignoreclean(u'.h\u200cg'.encode('utf-8')) @@ -133,8 +132,7 @@ class localstr(bytes): if typing.TYPE_CHECKING: # pseudo implementation to help pytype see localstr() constructor - def __init__(self, u, l): - # type: (bytes, bytes) -> None + def __init__(self, u: bytes, l: bytes) -> None: super(localstr, self).__init__(l) self._utf8 = u @@ -153,8 +151,7 @@ class safelocalstr(bytes): """ -def tolocal(s): - # type: (bytes) -> bytes +def tolocal(s: bytes) -> bytes: """ Convert a string from internal UTF-8 to local encoding @@ -222,8 +219,7 @@ def tolocal(s): ) -def fromlocal(s): - # type: (bytes) -> bytes +def fromlocal(s: bytes) -> bytes: """ Convert a string from the local character encoding to UTF-8 @@ -254,20 +250,17 @@ def fromlocal(s): ) -def unitolocal(u): - # type: (Text) -> bytes +def unitolocal(u: str) -> bytes: """Convert a unicode string to a byte string of local encoding""" return tolocal(u.encode('utf-8')) -def unifromlocal(s): - # type: (bytes) -> Text +def unifromlocal(s: bytes) -> str: """Convert a byte string of local encoding to a unicode string""" return fromlocal(s).decode('utf-8') -def unimethod(bytesfunc): - # type: (Callable[[Any], bytes]) -> Callable[[Any], Text] +def unimethod(bytesfunc: Callable[[Any], bytes]) -> Callable[[Any], str]: """Create a proxy method that forwards __unicode__() and __str__() of Python 3 to __bytes__()""" @@ -285,8 +278,7 @@ strfromlocal = unifromlocal strmethod = unimethod -def lower(s): - # type: (bytes) -> bytes +def lower(s: bytes) -> bytes: """best-effort encoding-aware case-folding of local string s""" try: return asciilower(s) @@ -310,8 +302,7 @@ def lower(s): ) -def upper(s): - # type: (bytes) -> bytes +def upper(s: bytes) -> bytes: """best-effort encoding-aware case-folding of local string s""" try: return asciiupper(s) @@ -319,8 +310,7 @@ def upper(s): return upperfallback(s) -def upperfallback(s): - # type: (Any) -> Any +def upperfallback(s: Any) -> Any: try: if isinstance(s, localstr): u = s._utf8.decode("utf-8") @@ -395,14 +385,12 @@ else: ) -def colwidth(s): - # type: (bytes) -> int +def colwidth(s: bytes) -> int: """Find the column width of a string for display in the local encoding""" return ucolwidth(s.decode(_sysstr(encoding), 'replace')) -def ucolwidth(d): - # type: (Text) -> int +def ucolwidth(d: Text) -> int: """Find the column width of a Unicode string for display""" eaw = getattr(unicodedata, 'east_asian_width', None) if eaw is not None: @@ -410,8 +398,7 @@ def ucolwidth(d): return len(d) -def getcols(s, start, c): - # type: (bytes, int, int) -> bytes +def getcols(s: bytes, start: int, c: int) -> bytes: """Use colwidth to find a c-column substring of s starting at byte index start""" for x in range(start + c, len(s)): @@ -421,8 +408,12 @@ def getcols(s, start, c): raise ValueError('substring not found') -def trim(s, width, ellipsis=b'', leftside=False): - # type: (bytes, int, bytes, bool) -> bytes +def trim( + s: bytes, + width: int, + ellipsis: bytes = b'', + leftside: bool = False, +) -> bytes: """Trim string 's' to at most 'width' columns (including 'ellipsis'). If 'leftside' is True, left side of string 's' is trimmed. @@ -540,8 +531,7 @@ class normcasespecs: other = 0 -def jsonescape(s, paranoid=False): - # type: (Any, Any) -> Any +def jsonescape(s: Any, paranoid: Any = False) -> Any: """returns a string suitable for JSON JSON is problematic for us because it doesn't support non-Unicode @@ -601,8 +591,7 @@ def jsonescape(s, paranoid=False): _utf8len = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 3, 4] -def getutf8char(s, pos): - # type: (bytes, int) -> bytes +def getutf8char(s: bytes, pos: int) -> bytes: """get the next full utf-8 character in the given string, starting at pos Raises a UnicodeError if the given location does not start a valid @@ -620,8 +609,7 @@ def getutf8char(s, pos): return c -def toutf8b(s): - # type: (bytes) -> bytes +def toutf8b(s: bytes) -> bytes: """convert a local, possibly-binary string into UTF-8b This is intended as a generic method to preserve data when working @@ -689,8 +677,7 @@ def toutf8b(s): return bytes(r) -def fromutf8b(s): - # type: (bytes) -> bytes +def fromutf8b(s: bytes) -> bytes: """Given a UTF-8b string, return a local, possibly-binary string. return the original binary string. This diff --git a/mercurial/error.py b/mercurial/error.py --- a/mercurial/error.py +++ b/mercurial/error.py @@ -40,8 +40,7 @@ assert [ ] -def _tobytes(exc): - # type: (...) -> bytes +def _tobytes(exc) -> bytes: """Byte-stringify exception in the same way as BaseException_str()""" if not exc.args: return b'' @@ -68,8 +67,7 @@ class Error(Hint, Exception): coarse_exit_code = None detailed_exit_code = None - def __init__(self, message, hint=None): - # type: (bytes, Optional[bytes]) -> None + def __init__(self, message: bytes, hint: Optional[bytes] = None) -> None: self.message = message self.hint = hint # Pass the message into the Exception constructor to help extensions @@ -79,15 +77,13 @@ class Error(Hint, Exception): def __bytes__(self): return self.message - def __str__(self): - # type: () -> str + def __str__(self) -> str: # the output would be unreadable if the message was translated, # but do not replace it with encoding.strfromlocal(), which # may raise another exception. return pycompat.sysstr(self.__bytes__()) - def format(self): - # type: () -> bytes + def format(self) -> bytes: from .i18n import _ message = _(b"abort: %s\n") % self.message @@ -114,8 +110,7 @@ class RevlogError(StorageError): class SidedataHashError(RevlogError): - def __init__(self, key, expected, got): - # type: (int, bytes, bytes) -> None + def __init__(self, key: int, expected: bytes, got: bytes) -> None: self.hint = None self.sidedatakey = key self.expecteddigest = expected @@ -127,8 +122,7 @@ class FilteredIndexError(IndexError): class LookupError(RevlogError, KeyError): - def __init__(self, name, index, message): - # type: (bytes, bytes, bytes) -> None + def __init__(self, name: bytes, index: bytes, message: bytes) -> None: self.name = name self.index = index # this can't be called 'message' because at least some installs of @@ -165,8 +159,7 @@ class ManifestLookupError(LookupError): class CommandError(Exception): """Exception raised on errors in parsing the command line.""" - def __init__(self, command, message): - # type: (Optional[bytes], bytes) -> None + def __init__(self, command: Optional[bytes], message: bytes) -> None: self.command = command self.message = message super(CommandError, self).__init__() @@ -177,8 +170,11 @@ class CommandError(Exception): class UnknownCommand(Exception): """Exception raised if command is not in the command table.""" - def __init__(self, command, all_commands=None): - # type: (bytes, Optional[List[bytes]]) -> None + def __init__( + self, + command: bytes, + all_commands: Optional[List[bytes]] = None, + ) -> None: self.command = command self.all_commands = all_commands super(UnknownCommand, self).__init__() @@ -189,8 +185,7 @@ class UnknownCommand(Exception): class AmbiguousCommand(Exception): """Exception raised if command shortcut matches more than one command.""" - def __init__(self, prefix, matches): - # type: (bytes, List[bytes]) -> None + def __init__(self, prefix: bytes, matches: List[bytes]) -> None: self.prefix = prefix self.matches = matches super(AmbiguousCommand, self).__init__() @@ -201,8 +196,7 @@ class AmbiguousCommand(Exception): class WorkerError(Exception): """Exception raised when a worker process dies.""" - def __init__(self, status_code): - # type: (int) -> None + def __init__(self, status_code: int) -> None: self.status_code = status_code # Pass status code to superclass just so it becomes part of __bytes__ super(WorkerError, self).__init__(status_code) @@ -216,8 +210,7 @@ class InterventionRequired(Abort): coarse_exit_code = 1 detailed_exit_code = 240 - def format(self): - # type: () -> bytes + def format(self) -> bytes: from .i18n import _ message = _(b"%s\n") % self.message @@ -229,8 +222,7 @@ class InterventionRequired(Abort): class ConflictResolutionRequired(InterventionRequired): """Exception raised when a continuable command required merge conflict resolution.""" - def __init__(self, opname): - # type: (bytes) -> None + def __init__(self, opname: bytes) -> None: from .i18n import _ self.opname = opname @@ -299,13 +291,16 @@ class ConfigError(Abort): detailed_exit_code = 30 - def __init__(self, message, location=None, hint=None): - # type: (bytes, Optional[bytes], Optional[bytes]) -> None + def __init__( + self, + message: bytes, + location: Optional[bytes] = None, + hint: Optional[bytes] = None, + ) -> None: super(ConfigError, self).__init__(message, hint=hint) self.location = location - def format(self): - # type: () -> bytes + def format(self) -> bytes: from .i18n import _ if self.location is not None: @@ -354,8 +349,11 @@ class RemoteError(Abort): class OutOfBandError(RemoteError): """Exception raised when a remote repo reports failure""" - def __init__(self, message=None, hint=None): - # type: (Optional[bytes], Optional[bytes]) -> None + def __init__( + self, + message: Optional[bytes] = None, + hint: Optional[bytes] = None, + ): from .i18n import _ if message: @@ -371,13 +369,16 @@ class ParseError(Abort): detailed_exit_code = 10 - def __init__(self, message, location=None, hint=None): - # type: (bytes, Optional[Union[bytes, int]], Optional[bytes]) -> None + def __init__( + self, + message: bytes, + location: Optional[Union[bytes, int]] = None, + hint: Optional[bytes] = None, + ): super(ParseError, self).__init__(message, hint=hint) self.location = location - def format(self): - # type: () -> bytes + def format(self) -> bytes: from .i18n import _ if self.location is not None: @@ -404,16 +405,14 @@ class PatchApplicationError(PatchError): __bytes__ = _tobytes -def getsimilar(symbols, value): - # type: (Iterable[bytes], bytes) -> List[bytes] +def getsimilar(symbols: Iterable[bytes], value: bytes) -> List[bytes]: sim = lambda x: difflib.SequenceMatcher(None, value, x).ratio() # The cutoff for similarity here is pretty arbitrary. It should # probably be investigated and tweaked. return [s for s in symbols if sim(s) > 0.6] -def similarity_hint(similar): - # type: (List[bytes]) -> Optional[bytes] +def similarity_hint(similar: List[bytes]) -> Optional[bytes]: from .i18n import _ if len(similar) == 1: @@ -428,8 +427,7 @@ def similarity_hint(similar): class UnknownIdentifier(ParseError): """Exception raised when a {rev,file}set references an unknown identifier""" - def __init__(self, function, symbols): - # type: (bytes, Iterable[bytes]) -> None + def __init__(self, function: bytes, symbols: Iterable[bytes]) -> None: from .i18n import _ similar = getsimilar(symbols, function) @@ -463,16 +461,14 @@ class RequirementError(RepoError): class StdioError(IOError): """Raised if I/O to stdout or stderr fails""" - def __init__(self, err): - # type: (IOError) -> None + def __init__(self, err: IOError) -> None: IOError.__init__(self, err.errno, err.strerror) # no __bytes__() because error message is derived from the standard IOError class UnsupportedMergeRecords(Abort): - def __init__(self, recordtypes): - # type: (Iterable[bytes]) -> None + def __init__(self, recordtypes: Iterable[bytes]) -> None: from .i18n import _ self.recordtypes = sorted(recordtypes) @@ -490,15 +486,24 @@ class UnsupportedMergeRecords(Abort): class UnknownVersion(Abort): """generic exception for aborting from an encounter with an unknown version""" - def __init__(self, msg, hint=None, version=None): - # type: (bytes, Optional[bytes], Optional[bytes]) -> None + def __init__( + self, + msg: bytes, + hint: Optional[bytes] = None, + version: Optional[bytes] = None, + ) -> None: self.version = version super(UnknownVersion, self).__init__(msg, hint=hint) class LockError(IOError): - def __init__(self, errno, strerror, filename, desc): - # _type: (int, str, bytes, bytes) -> None + def __init__( + self, + errno: int, + strerror: str, + filename: bytes, + desc: Optional[bytes], + ) -> None: IOError.__init__(self, errno, strerror, filename) self.desc = desc @@ -506,8 +511,15 @@ class LockError(IOError): class LockHeld(LockError): - def __init__(self, errno, filename, desc, locker): + def __init__( + self, + errno: int, + filename: bytes, + desc: Optional[bytes], + locker, + ): LockError.__init__(self, errno, 'Lock held', filename, desc) + self.filename: bytes = filename self.locker = locker @@ -544,8 +556,7 @@ class PushRaced(RuntimeError): class ProgrammingError(Hint, RuntimeError): """Raised if a mercurial (core or extension) developer made a mistake""" - def __init__(self, msg, *args, **kwargs): - # type: (AnyStr, Any, Any) -> None + def __init__(self, msg: AnyStr, *args, **kwargs): # On Python 3, turn the message back into a string since this is # an internal-only error that won't be printed except in a # stack traces. @@ -622,8 +633,7 @@ class CensoredNodeError(StorageError): Also contains the tombstone data substituted for the uncensored data. """ - def __init__(self, filename, node, tombstone): - # type: (bytes, bytes, bytes) -> None + def __init__(self, filename: bytes, node: bytes, tombstone: bytes): from .node import short StorageError.__init__(self, b'%s:%s' % (filename, short(node))) @@ -685,7 +695,10 @@ class WireprotoCommandError(Exception): The error is a formatter string and an optional iterable of arguments. """ - def __init__(self, message, args=None): - # type: (bytes, Optional[Sequence[bytes]]) -> None + def __init__( + self, + message: bytes, + args: Optional[Sequence[bytes]] = None, + ) -> None: self.message = message self.messageargs = args diff --git a/mercurial/hgweb/webcommands.py b/mercurial/hgweb/webcommands.py --- a/mercurial/hgweb/webcommands.py +++ b/mercurial/hgweb/webcommands.py @@ -516,8 +516,7 @@ def changeset(web): rev = webcommand(b'rev')(changeset) -def decodepath(path): - # type: (bytes) -> bytes +def decodepath(path: bytes) -> bytes: """Hook for mapping a path in the repository to a path in the working copy. diff --git a/mercurial/i18n.py b/mercurial/i18n.py --- a/mercurial/i18n.py +++ b/mercurial/i18n.py @@ -71,8 +71,7 @@ except AttributeError: _msgcache = {} # encoding: {message: translation} -def gettext(message): - # type: (bytes) -> bytes +def gettext(message: bytes) -> bytes: """Translate message. The message is looked up in the catalog to get a Unicode string, @@ -123,6 +122,10 @@ def _plain(): if _plain(): - _ = lambda message: message # type: Callable[[bytes], bytes] + + def _(message: bytes) -> bytes: + return message + + else: _ = gettext diff --git a/mercurial/logcmdutil.py b/mercurial/logcmdutil.py --- a/mercurial/logcmdutil.py +++ b/mercurial/logcmdutil.py @@ -789,8 +789,11 @@ class walkopts: limit = attr.ib(default=None) -def parseopts(ui, pats, opts): - # type: (Any, Sequence[bytes], Dict[bytes, Any]) -> walkopts +def parseopts( + ui: Any, + pats: Sequence[bytes], + opts: Dict[bytes, Any], +) -> walkopts: """Parse log command options into walkopts The returned walkopts will be passed in to getrevs() or makewalker(). @@ -1080,8 +1083,12 @@ def _initialrevs(repo, wopts): return revs -def makewalker(repo, wopts): - # type: (Any, walkopts) -> Tuple[smartset.abstractsmartset, Optional[Callable[[Any], matchmod.basematcher]]] +def makewalker( + repo: Any, + wopts: walkopts, +) -> Tuple[ + smartset.abstractsmartset, Optional[Callable[[Any], matchmod.basematcher]] +]: """Build (revs, makefilematcher) to scan revision/file history - revs is the smartset to be traversed. @@ -1131,8 +1138,10 @@ def makewalker(repo, wopts): return revs, filematcher -def getrevs(repo, wopts): - # type: (Any, walkopts) -> Tuple[smartset.abstractsmartset, Optional[changesetdiffer]] +def getrevs( + repo: Any, + wopts: walkopts, +) -> Tuple[smartset.abstractsmartset, Optional[changesetdiffer]]: """Return (revs, differ) where revs is a smartset differ is a changesetdiffer with pre-configured file matcher. diff --git a/mercurial/mail.py b/mercurial/mail.py --- a/mercurial/mail.py +++ b/mercurial/mail.py @@ -21,6 +21,7 @@ import time from typing import ( Any, List, + Optional, Tuple, Union, ) @@ -113,8 +114,7 @@ class SMTPS(smtplib.SMTP): return new_socket -def _pyhastls(): - # type: () -> bool +def _pyhastls() -> bool: """Returns true iff Python has TLS support, false otherwise.""" try: import ssl @@ -277,8 +277,7 @@ def validateconfig(ui): ) -def codec2iana(cs): - # type: (str) -> str +def codec2iana(cs: str) -> str: ''' ''' cs = email.charset.Charset(cs).input_charset.lower() @@ -288,8 +287,11 @@ def codec2iana(cs): return cs -def mimetextpatch(s, subtype='plain', display=False): - # type: (bytes, str, bool) -> email.message.Message +def mimetextpatch( + s: bytes, + subtype: str = 'plain', + display: bool = False, +) -> email.message.Message: """Return MIME message suitable for a patch. Charset will be detected by first trying to decode as us-ascii, then utf-8, and finally the global encodings. If all those fail, fall back to @@ -314,8 +316,9 @@ def mimetextpatch(s, subtype='plain', di return mimetextqp(s, subtype, "iso-8859-1") -def mimetextqp(body, subtype, charset): - # type: (bytes, str, str) -> email.message.Message +def mimetextqp( + body: bytes, subtype: str, charset: str +) -> email.message.Message: """Return MIME message. Quoted-printable transfer encoding will be used if necessary. """ @@ -340,8 +343,7 @@ def mimetextqp(body, subtype, charset): return msg -def _charsets(ui): - # type: (Any) -> List[str] +def _charsets(ui: Any) -> List[str]: '''Obtains charsets to send mail parts not containing patches.''' charsets = [ pycompat.sysstr(cs.lower()) @@ -358,8 +360,7 @@ def _charsets(ui): return [cs for cs in charsets if not cs.endswith('ascii')] -def _encode(ui, s, charsets): - # type: (Any, bytes, List[str]) -> Tuple[bytes, str] +def _encode(ui: Any, s: bytes, charsets: List[str]) -> Tuple[bytes, str]: """Returns (converted) string, charset tuple. Finds out best charset by cycling through sendcharsets in descending order. Tries both encoding and fallbackencoding for input. Only as @@ -409,8 +410,12 @@ def _encode(ui, s, charsets): return s, 'us-ascii' -def headencode(ui, s, charsets=None, display=False): - # type: (Any, Union[bytes, str], List[str], bool) -> str +def headencode( + ui: Any, + s: Union[bytes, str], + charsets: Optional[List[str]] = None, + display: bool = False, +) -> str: '''Returns RFC-2047 compliant header from given string.''' if not display: # split into words? @@ -419,8 +424,9 @@ def headencode(ui, s, charsets=None, dis return encoding.strfromlocal(s) -def _addressencode(ui, name, addr, charsets=None): - # type: (Any, str, str, List[str]) -> str +def _addressencode( + ui: Any, name: str, addr: str, charsets: Optional[List[str]] = None +) -> str: addr = encoding.strtolocal(addr) name = headencode(ui, name, charsets) try: @@ -439,8 +445,12 @@ def _addressencode(ui, name, addr, chars return email.utils.formataddr((name, encoding.strfromlocal(addr))) -def addressencode(ui, address, charsets=None, display=False): - # type: (Any, bytes, List[str], bool) -> str +def addressencode( + ui: Any, + address: bytes, + charsets: Optional[List[str]] = None, + display: bool = False, +) -> str: '''Turns address into RFC-2047 compliant header.''' if display or not address: return encoding.strfromlocal(address or b'') @@ -448,8 +458,12 @@ def addressencode(ui, address, charsets= return _addressencode(ui, name, addr, charsets) -def addrlistencode(ui, addrs, charsets=None, display=False): - # type: (Any, List[bytes], List[str], bool) -> List[str] +def addrlistencode( + ui: Any, + addrs: List[bytes], + charsets: Optional[List[str]] = None, + display: bool = False, +) -> List[str]: """Turns a list of addresses into a list of RFC-2047 compliant headers. A single element of input list may contain multiple addresses, but output always has one address per item""" @@ -468,8 +482,12 @@ def addrlistencode(ui, addrs, charsets=N return result -def mimeencode(ui, s, charsets=None, display=False): - # type: (Any, bytes, List[str], bool) -> email.message.Message +def mimeencode( + ui: Any, + s: bytes, + charsets: Optional[List[str]] = None, + display: bool = False, +) -> email.message.Message: """creates mime text object, encodes it if needed, and sets charset and transfer-encoding accordingly.""" cs = 'us-ascii' @@ -481,8 +499,7 @@ def mimeencode(ui, s, charsets=None, dis Generator = email.generator.BytesGenerator -def parse(fp): - # type: (Any) -> email.message.Message +def parse(fp: Any) -> email.message.Message: ep = email.parser.Parser() # disable the "universal newlines" mode, which isn't binary safe. # I have no idea if ascii/surrogateescape is correct, but that's @@ -496,14 +513,12 @@ def parse(fp): fp.detach() -def parsebytes(data): - # type: (bytes) -> email.message.Message +def parsebytes(data: bytes) -> email.message.Message: ep = email.parser.BytesParser() return ep.parsebytes(data) -def headdecode(s): - # type: (Union[email.header.Header, bytes]) -> bytes +def headdecode(s: Union[email.header.Header, bytes]) -> bytes: '''Decodes RFC-2047 header''' uparts = [] for part, charset in email.header.decode_header(s): diff --git a/mercurial/pathutil.py b/mercurial/pathutil.py --- a/mercurial/pathutil.py +++ b/mercurial/pathutil.py @@ -32,8 +32,7 @@ assert [ ] -def _lowerclean(s): - # type: (bytes) -> bytes +def _lowerclean(s: bytes) -> bytes: return encoding.hfsignoreclean(s.lower()) @@ -72,8 +71,7 @@ class pathauditor: else: self.normcase = lambda x: x - def __call__(self, path, mode=None): - # type: (bytes, Optional[Any]) -> None + def __call__(self, path: bytes, mode: Optional[Any] = None) -> None: """Check the relative path. path may contain a pattern (e.g. foodir/**.txt)""" @@ -170,8 +168,7 @@ class pathauditor: raise error.Abort(msg % (path, pycompat.bytestr(prefix))) return True - def check(self, path): - # type: (bytes) -> bool + def check(self, path: bytes) -> bool: try: self(path) return True @@ -192,8 +189,12 @@ class pathauditor: self._cached = False -def canonpath(root, cwd, myname, auditor=None): - # type: (bytes, bytes, bytes, Optional[pathauditor]) -> bytes +def canonpath( + root: bytes, + cwd: bytes, + myname: bytes, + auditor: Optional[pathauditor] = None, +) -> bytes: """return the canonical path of myname, given cwd and root >>> def check(root, cwd, myname): @@ -295,8 +296,7 @@ def canonpath(root, cwd, myname, auditor ) -def normasprefix(path): - # type: (bytes) -> bytes +def normasprefix(path: bytes) -> bytes: """normalize the specified path as path prefix Returned value can be used safely for "p.startswith(prefix)", @@ -319,8 +319,7 @@ def normasprefix(path): return path -def finddirs(path): - # type: (bytes) -> Iterator[bytes] +def finddirs(path: bytes) -> Iterator[bytes]: pos = path.rfind(b'/') while pos != -1: yield path[:pos] @@ -355,8 +354,7 @@ class dirs: for f in map: addpath(f) - def addpath(self, path): - # type: (bytes) -> None + def addpath(self, path: bytes) -> None: dirs = self._dirs for base in finddirs(path): if base.endswith(b'/'): @@ -368,8 +366,7 @@ class dirs: return dirs[base] = 1 - def delpath(self, path): - # type: (bytes) -> None + def delpath(self, path: bytes) -> None: dirs = self._dirs for base in finddirs(path): if dirs[base] > 1: @@ -380,8 +377,7 @@ class dirs: def __iter__(self): return iter(self._dirs) - def __contains__(self, d): - # type: (bytes) -> bool + def __contains__(self, d: bytes) -> bool: return d in self._dirs diff --git a/mercurial/phases.py b/mercurial/phases.py --- a/mercurial/phases.py +++ b/mercurial/phases.py @@ -192,20 +192,20 @@ all_internal_phases = tuple(p for p in a no_bundle_phases = all_internal_phases -def supportinternal(repo): - # type: (localrepo.localrepository) -> bool +def supportinternal(repo: "localrepo.localrepository") -> bool: """True if the internal phase can be used on a repository""" return requirements.INTERNAL_PHASE_REQUIREMENT in repo.requirements -def supportarchived(repo): - # type: (localrepo.localrepository) -> bool +def supportarchived(repo: "localrepo.localrepository") -> bool: """True if the archived phase can be used on a repository""" return requirements.ARCHIVED_PHASE_REQUIREMENT in repo.requirements -def _readroots(repo, phasedefaults=None): - # type: (localrepo.localrepository, Optional[Phasedefaults]) -> Tuple[Phaseroots, bool] +def _readroots( + repo: "localrepo.localrepository", + phasedefaults: Optional["Phasedefaults"] = None, +) -> Tuple[Phaseroots, bool]: """Read phase roots from disk phasedefaults is a list of fn(repo, roots) callable, which are @@ -235,8 +235,7 @@ def _readroots(repo, phasedefaults=None) return roots, dirty -def binaryencode(phasemapping): - # type: (Dict[int, List[bytes]]) -> bytes +def binaryencode(phasemapping: Dict[int, List[bytes]]) -> bytes: """encode a 'phase -> nodes' mapping into a binary stream The revision lists are encoded as (phase, root) pairs. @@ -248,8 +247,7 @@ def binaryencode(phasemapping): return b''.join(binarydata) -def binarydecode(stream): - # type: (...) -> Dict[int, List[bytes]] +def binarydecode(stream) -> Dict[int, List[bytes]]: """decode a binary stream into a 'phase -> nodes' mapping The (phase, root) pairs are turned back into a dictionary with @@ -367,8 +365,12 @@ def _trackphasechange(data, rev, old, ne class phasecache: - def __init__(self, repo, phasedefaults, _load=True): - # type: (localrepo.localrepository, Optional[Phasedefaults], bool) -> None + def __init__( + self, + repo: "localrepo.localrepository", + phasedefaults: Optional["Phasedefaults"], + _load: bool = True, + ): if _load: # Cheap trick to allow shallow-copy without copy module self.phaseroots, self.dirty = _readroots(repo, phasedefaults) @@ -377,8 +379,7 @@ class phasecache: self.filterunknown(repo) self.opener = repo.svfs - def hasnonpublicphases(self, repo): - # type: (localrepo.localrepository) -> bool + def hasnonpublicphases(self, repo: "localrepo.localrepository") -> bool: """detect if there are revisions with non-public phase""" repo = repo.unfiltered() cl = repo.changelog @@ -389,8 +390,9 @@ class phasecache: revs for phase, revs in self.phaseroots.items() if phase != public ) - def nonpublicphaseroots(self, repo): - # type: (localrepo.localrepository) -> Set[bytes] + def nonpublicphaseroots( + self, repo: "localrepo.localrepository" + ) -> Set[bytes]: """returns the roots of all non-public phases The roots are not minimized, so if the secret revisions are @@ -409,8 +411,12 @@ class phasecache: ] ) - def getrevset(self, repo, phases, subset=None): - # type: (localrepo.localrepository, Iterable[int], Optional[Any]) -> Any + def getrevset( + self, + repo: "localrepo.localrepository", + phases: Iterable[int], + subset: Optional[Any] = None, + ) -> Any: # TODO: finish typing this """return a smartset for the given phases""" self.loadphaserevs(repo) # ensure phase's sets are loaded @@ -506,8 +512,7 @@ class phasecache: self._phasesets[phase] = ps self._loadedrevslen = len(cl) - def loadphaserevs(self, repo): - # type: (localrepo.localrepository) -> None + def loadphaserevs(self, repo: "localrepo.localrepository") -> None: """ensure phase information is loaded in the object""" if self._phasesets is None: try: @@ -520,8 +525,7 @@ class phasecache: self._loadedrevslen = 0 self._phasesets = None - def phase(self, repo, rev): - # type: (localrepo.localrepository, int) -> int + def phase(self, repo: "localrepo.localrepository", rev: int) -> int: # We need a repo argument here to be able to build _phasesets # if necessary. The repository instance is not stored in # phasecache to avoid reference cycles. The changelog instance @@ -708,8 +712,7 @@ class phasecache: return True return False - def filterunknown(self, repo): - # type: (localrepo.localrepository) -> None + def filterunknown(self, repo: "localrepo.localrepository") -> None: """remove unknown nodes from the phase boundary Nothing is lost as unknown nodes only hold data for their descendants. @@ -786,8 +789,7 @@ def registernew(repo, tr, targetphase, r repo._phasecache.replace(phcache) -def listphases(repo): - # type: (localrepo.localrepository) -> Dict[bytes, bytes] +def listphases(repo: "localrepo.localrepository") -> Dict[bytes, bytes]: """List phases root for serialization over pushkey""" # Use ordered dictionary so behavior is deterministic. keys = util.sortdict() @@ -818,8 +820,12 @@ def listphases(repo): return keys -def pushphase(repo, nhex, oldphasestr, newphasestr): - # type: (localrepo.localrepository, bytes, bytes, bytes) -> bool +def pushphase( + repo: "localrepo.localrepository", + nhex: bytes, + oldphasestr: bytes, + newphasestr: bytes, +) -> bool: """List phases root for serialization over pushkey""" repo = repo.unfiltered() with repo.lock(): @@ -966,8 +972,7 @@ def newheads(repo, heads, roots): return pycompat.maplist(cl.node, sorted(new_heads)) -def newcommitphase(ui): - # type: (uimod.ui) -> int +def newcommitphase(ui: "uimod.ui") -> int: """helper to get the target phase of new commit Handle all possible values for the phases.new-commit options. @@ -982,14 +987,16 @@ def newcommitphase(ui): ) -def hassecret(repo): - # type: (localrepo.localrepository) -> bool +def hassecret(repo: "localrepo.localrepository") -> bool: """utility function that check if a repo have any secret changeset.""" return bool(repo._phasecache.phaseroots[secret]) -def preparehookargs(node, old, new): - # type: (bytes, Optional[int], Optional[int]) -> Dict[bytes, bytes] +def preparehookargs( + node: bytes, + old: Optional[int], + new: Optional[int], +) -> Dict[bytes, bytes]: if old is None: old = b'' else: diff --git a/mercurial/pvec.py b/mercurial/pvec.py --- a/mercurial/pvec.py +++ b/mercurial/pvec.py @@ -72,8 +72,7 @@ def _bin(bs): return v -def _str(v, l): - # type: (int, int) -> bytes +def _str(v: int, l: int) -> bytes: bs = b"" for p in range(l): bs = pycompat.bytechr(v & 255) + bs diff --git a/mercurial/state.py b/mercurial/state.py --- a/mercurial/state.py +++ b/mercurial/state.py @@ -59,8 +59,7 @@ class cmdstate: self._repo = repo self.fname = fname - def read(self): - # type: () -> Dict[bytes, Any] + def read(self) -> Dict[bytes, Any]: """read the existing state file and return a dict of data stored""" return self._read() diff --git a/mercurial/subrepoutil.py b/mercurial/subrepoutil.py --- a/mercurial/subrepoutil.py +++ b/mercurial/subrepoutil.py @@ -69,8 +69,7 @@ if typing.TYPE_CHECKING: Substate = Dict[bytes, Tuple[bytes, bytes, bytes]] -def state(ctx, ui): - # type: (context.changectx, uimod.ui) -> Substate +def state(ctx: "context.changectx", ui: "uimod.ui") -> Substate: """return a state dict, mapping subrepo paths configured in .hgsub to tuple: (source from .hgsub, revision from .hgsubstate, kind (key in types dict)) @@ -122,8 +121,7 @@ def state(ctx, ui): except FileNotFoundError: pass - def remap(src): - # type: (bytes) -> bytes + def remap(src: bytes) -> bytes: for pattern, repl in p.items(b'subpaths'): # Turn r'C:\foo\bar' into r'C:\\foo\\bar' since re.sub # does a string decode. @@ -175,8 +173,7 @@ def state(ctx, ui): return state -def writestate(repo, state): - # type: (localrepo.localrepository, Substate) -> None +def writestate(repo: "localrepo.localrepository", state: Substate) -> None: """rewrite .hgsubstate in (outer) repo with these subrepo states""" lines = [ b'%s %s\n' % (state[s][1], s) @@ -186,8 +183,14 @@ def writestate(repo, state): repo.wwrite(b'.hgsubstate', b''.join(lines), b'') -def submerge(repo, wctx, mctx, actx, overwrite, labels=None): - # type: (localrepo.localrepository, context.workingctx, context.changectx, context.changectx, bool, Optional[Any]) -> Substate +def submerge( + repo: "localrepo.localrepository", + wctx: "context.workingctx", + mctx: "context.changectx", + actx: "context.changectx", + overwrite: bool, + labels: Optional[Any] = None, +) -> Substate: # TODO: type the `labels` arg """delegated from merge.applyupdates: merging of .hgsubstate file in working context, merging context and ancestor context""" @@ -327,8 +330,13 @@ def submerge(repo, wctx, mctx, actx, ove return sm -def precommit(ui, wctx, status, match, force=False): - # type: (uimod.ui, context.workingcommitctx, scmutil.status, matchmod.basematcher, bool) -> Tuple[List[bytes], Set[bytes], Substate] +def precommit( + ui: "uimod.ui", + wctx: "context.workingcommitctx", + status: "scmutil.status", + match: "matchmod.basematcher", + force: bool = False, +) -> Tuple[List[bytes], Set[bytes], Substate]: """Calculate .hgsubstate changes that should be applied before committing Returns (subs, commitsubs, newstate) where @@ -416,8 +424,7 @@ def repo_rel_or_abs_source(repo): return posixpath.normpath(path) -def reporelpath(repo): - # type: (localrepo.localrepository) -> bytes +def reporelpath(repo: "localrepo.localrepository") -> bytes: """return path to this (sub)repo as seen from outermost repo""" parent = repo while hasattr(parent, '_subparent'): @@ -425,14 +432,16 @@ def reporelpath(repo): return repo.root[len(pathutil.normasprefix(parent.root)) :] -def subrelpath(sub): - # type: (subrepo.abstractsubrepo) -> bytes +def subrelpath(sub: "subrepo.abstractsubrepo") -> bytes: """return path to this subrepo as seen from outermost repo""" return sub._relpath -def _abssource(repo, push=False, abort=True): - # type: (localrepo.localrepository, bool, bool) -> Optional[bytes] +def _abssource( + repo: "localrepo.localrepository", + push: bool = False, + abort: bool = True, +) -> Optional[bytes]: """return pull/push path of repo - either based on parent repo .hgsub info or on the top repo config. Abort or return None if no source found.""" if hasattr(repo, '_subparent'): @@ -480,8 +489,7 @@ def _abssource(repo, push=False, abort=T raise error.Abort(_(b"default path for subrepository not found")) -def newcommitphase(ui, ctx): - # type: (uimod.ui, context.changectx) -> int +def newcommitphase(ui: "uimod.ui", ctx: "context.changectx") -> int: commitphase = phases.newcommitphase(ui) substate = getattr(ctx, "substate", None) if not substate: diff --git a/mercurial/util.py b/mercurial/util.py --- a/mercurial/util.py +++ b/mercurial/util.py @@ -147,8 +147,7 @@ unlink = platform.unlink username = platform.username -def setumask(val): - # type: (int) -> None +def setumask(val: int) -> None: '''updates the umask. used by chg server''' if pycompat.iswindows: return @@ -1850,8 +1849,7 @@ if pycompat.ispypy: nogc = lambda x: x -def pathto(root, n1, n2): - # type: (bytes, bytes, bytes) -> bytes +def pathto(root: bytes, n1: bytes, n2: bytes) -> bytes: """return the relative path from one place to another. root should use os.sep to separate directories n1 should use os.sep to separate directories @@ -2062,8 +2060,7 @@ def copyfiles(src, dst, hardlink=None, p _winreservedchars = b':*?"<>|' -def checkwinfilename(path): - # type: (bytes) -> Optional[bytes] +def checkwinfilename(path: bytes) -> Optional[bytes]: r"""Check that the base-relative path is a valid filename on Windows. Returns None if the path is ok, or a UI string describing the problem. @@ -2157,8 +2154,7 @@ def makelock(info, pathname): os.close(ld) -def readlock(pathname): - # type: (bytes) -> bytes +def readlock(pathname: bytes) -> bytes: try: return readlink(pathname) except OSError as why: @@ -2181,8 +2177,7 @@ def fstat(fp): # File system features -def fscasesensitive(path): - # type: (bytes) -> bool +def fscasesensitive(path: bytes) -> bool: """ Return true if the given path is on a case-sensitive filesystem @@ -2286,8 +2281,7 @@ re = _re() _fspathcache = {} -def fspath(name, root): - # type: (bytes, bytes) -> bytes +def fspath(name: bytes, root: bytes) -> bytes: """Get name in the case stored in the filesystem The name should be relative to root, and be normcase-ed for efficiency. @@ -2331,8 +2325,7 @@ def fspath(name, root): return b''.join(result) -def checknlink(testfile): - # type: (bytes) -> bool +def checknlink(testfile: bytes) -> bool: '''check whether hardlink count reporting works properly''' # testfile may be open, so we need a separate file for checking to @@ -2365,8 +2358,7 @@ def checknlink(testfile): pass -def endswithsep(path): - # type: (bytes) -> bool +def endswithsep(path: bytes) -> bool: '''Check path ends with os.sep or os.altsep.''' return bool( # help pytype path.endswith(pycompat.ossep) @@ -2375,8 +2367,7 @@ def endswithsep(path): ) -def splitpath(path): - # type: (bytes) -> List[bytes] +def splitpath(path: bytes) -> List[bytes]: """Split path by os.sep. Note that this function does not use os.altsep because this is an alternative of simple "xxx.split(os.sep)". @@ -2609,8 +2600,9 @@ def tryrmdir(f): raise -def unlinkpath(f, ignoremissing=False, rmdir=True): - # type: (bytes, bool, bool) -> None +def unlinkpath( + f: bytes, ignoremissing: bool = False, rmdir: bool = True +) -> None: """unlink and remove the directory if it is empty""" if ignoremissing: tryunlink(f) @@ -2624,8 +2616,7 @@ def unlinkpath(f, ignoremissing=False, r pass -def tryunlink(f): - # type: (bytes) -> None +def tryunlink(f: bytes) -> None: """Attempt to remove a file, ignoring FileNotFoundError.""" try: unlink(f) @@ -2633,8 +2624,9 @@ def tryunlink(f): pass -def makedirs(name, mode=None, notindexed=False): - # type: (bytes, Optional[int], bool) -> None +def makedirs( + name: bytes, mode: Optional[int] = None, notindexed: bool = False +) -> None: """recursive directory creation with parent mode inheritance Newly created directories are marked as "not to be indexed by @@ -2663,20 +2655,17 @@ def makedirs(name, mode=None, notindexed os.chmod(name, mode) -def readfile(path): - # type: (bytes) -> bytes +def readfile(path: bytes) -> bytes: with open(path, b'rb') as fp: return fp.read() -def writefile(path, text): - # type: (bytes, bytes) -> None +def writefile(path: bytes, text: bytes) -> None: with open(path, b'wb') as fp: fp.write(text) -def appendfile(path, text): - # type: (bytes, bytes) -> None +def appendfile(path: bytes, text: bytes) -> None: with open(path, b'ab') as fp: fp.write(text) @@ -2837,8 +2826,7 @@ def unitcountfn(*unittable): return go -def processlinerange(fromline, toline): - # type: (int, int) -> Tuple[int, int] +def processlinerange(fromline: int, toline: int) -> Tuple[int, int]: """Check that linerange : makes sense and return a 0-based range. @@ -2897,13 +2885,11 @@ class transformingwriter: _eolre = remod.compile(br'\r*\n') -def tolf(s): - # type: (bytes) -> bytes +def tolf(s: bytes) -> bytes: return _eolre.sub(b'\n', s) -def tocrlf(s): - # type: (bytes) -> bytes +def tocrlf(s: bytes) -> bytes: return _eolre.sub(b'\r\n', s) @@ -2926,15 +2912,13 @@ def iterfile(fp): return fp -def iterlines(iterator): - # type: (Iterable[bytes]) -> Iterator[bytes] +def iterlines(iterator: Iterable[bytes]) -> Iterator[bytes]: for chunk in iterator: for line in chunk.splitlines(): yield line -def expandpath(path): - # type: (bytes) -> bytes +def expandpath(path: bytes) -> bytes: return os.path.expanduser(os.path.expandvars(path)) @@ -3062,8 +3046,7 @@ def timed(func): ) -def sizetoint(s): - # type: (bytes) -> int +def sizetoint(s: bytes) -> int: """Convert a space specifier to a byte count. >>> sizetoint(b'30') @@ -3285,8 +3268,7 @@ def with_lc_ctype(): yield -def _estimatememory(): - # type: () -> Optional[int] +def _estimatememory() -> Optional[int]: """Provide an estimate for the available system memory in Bytes. If no estimate can be provided on the platform, returns None. diff --git a/mercurial/utils/dateutil.py b/mercurial/utils/dateutil.py --- a/mercurial/utils/dateutil.py +++ b/mercurial/utils/dateutil.py @@ -81,8 +81,7 @@ extendeddateformats = defaultdateformats ) -def makedate(timestamp=None): - # type: (Optional[float]) -> hgdate +def makedate(timestamp: Optional[float] = None) -> hgdate: """Return a unix timestamp (or the current time) as a (unixtime, offset) tuple based off the local timezone.""" if timestamp is None: @@ -103,8 +102,10 @@ def makedate(timestamp=None): return timestamp, tz -def datestr(date=None, format=b'%a %b %d %H:%M:%S %Y %1%2'): - # type: (Optional[hgdate], bytes) -> bytes +def datestr( + date: Optional[hgdate] = None, + format: bytes = b'%a %b %d %H:%M:%S %Y %1%2', +) -> bytes: """represent a (unixtime, offset) tuple as a localized time. unixtime is seconds since the epoch, and offset is the time zone's number of seconds away from UTC. @@ -141,14 +142,12 @@ def datestr(date=None, format=b'%a %b %d return s -def shortdate(date=None): - # type: (Optional[hgdate]) -> bytes +def shortdate(date: Optional[hgdate] = None) -> bytes: """turn (timestamp, tzoff) tuple into iso 8631 date.""" return datestr(date, format=b'%Y-%m-%d') -def parsetimezone(s): - # type: (bytes) -> Tuple[Optional[int], bytes] +def parsetimezone(s: bytes) -> Tuple[Optional[int], bytes]: """find a trailing timezone, if any, in string, and return a (offset, remainder) pair""" s = pycompat.bytestr(s) @@ -183,8 +182,11 @@ def parsetimezone(s): return None, s -def strdate(string, format, defaults=None): - # type: (bytes, bytes, Optional[Dict[bytes, Tuple[bytes, bytes]]]) -> hgdate +def strdate( + string: bytes, + format: bytes, + defaults: Optional[Dict[bytes, Tuple[bytes, bytes]]] = None, +) -> hgdate: """parse a localized time string and return a (unixtime, offset) tuple. if the string cannot be parsed, ValueError is raised.""" if defaults is None: @@ -226,8 +228,11 @@ def strdate(string, format, defaults=Non return unixtime, offset -def parsedate(date, formats=None, bias=None): - # type: (Union[bytes, hgdate], Optional[Iterable[bytes]], Optional[Dict[bytes, bytes]]) -> hgdate +def parsedate( + date: Union[bytes, hgdate], + formats: Optional[Iterable[bytes]] = None, + bias: Optional[Dict[bytes, bytes]] = None, +) -> hgdate: """parse a localized date/time and return a (unixtime, offset) tuple. The date may be a "unixtime offset" string or in one of the specified @@ -316,8 +321,7 @@ def parsedate(date, formats=None, bias=N return when, offset -def matchdate(date): - # type: (bytes) -> Callable[[float], bool] +def matchdate(date: bytes) -> Callable[[float], bool]: """Return a function that matches a given date match specifier Formats include: @@ -346,13 +350,11 @@ def matchdate(date): False """ - def lower(date): - # type: (bytes) -> float + def lower(date: bytes) -> float: d = {b'mb': b"1", b'd': b"1"} return parsedate(date, extendeddateformats, d)[0] - def upper(date): - # type: (bytes) -> float + def upper(date: bytes) -> float: d = {b'mb': b"12", b'HI': b"23", b'M': b"59", b'S': b"59"} for days in (b"31", b"30", b"29"): try: diff --git a/mercurial/utils/urlutil.py b/mercurial/utils/urlutil.py --- a/mercurial/utils/urlutil.py +++ b/mercurial/utils/urlutil.py @@ -34,8 +34,7 @@ assert [Union] urlreq = urllibcompat.urlreq -def getport(port): - # type: (Union[bytes, int]) -> int +def getport(port: Union[bytes, int]) -> int: """Return the port for a given network service. If port is an integer, it's returned as is. If it's a string, it's @@ -133,8 +132,12 @@ class url: _safepchars = b"/!~*'()+:\\" _matchscheme = remod.compile(b'^[a-zA-Z0-9+.\\-]+:').match - def __init__(self, path, parsequery=True, parsefragment=True): - # type: (bytes, bool, bool) -> None + def __init__( + self, + path: bytes, + parsequery: bool = True, + parsefragment: bool = True, + ) -> None: # We slowly chomp away at path until we have only the path left self.scheme = self.user = self.passwd = self.host = None self.port = self.path = self.query = self.fragment = None @@ -378,8 +381,7 @@ class url: return True # POSIX-style return False - def localpath(self): - # type: () -> bytes + def localpath(self) -> bytes: if self.scheme == b'file' or self.scheme == b'bundle': path = self.path or b'/' # For Windows, we need to promote hosts containing drive @@ -402,23 +404,19 @@ class url: ) -def hasscheme(path): - # type: (bytes) -> bool +def hasscheme(path: bytes) -> bool: return bool(url(path).scheme) # cast to help pytype -def hasdriveletter(path): - # type: (bytes) -> bool +def hasdriveletter(path: bytes) -> bool: return bool(path) and path[1:2] == b':' and path[0:1].isalpha() -def urllocalpath(path): - # type: (bytes) -> bytes +def urllocalpath(path: bytes) -> bytes: return url(path, parsequery=False, parsefragment=False).localpath() -def checksafessh(path): - # type: (bytes) -> None +def checksafessh(path: bytes) -> None: """check if a path / url is a potentially unsafe ssh exploit (SEC) This is a sanity check for ssh urls. ssh will parse the first item as @@ -435,8 +433,7 @@ def checksafessh(path): ) -def hidepassword(u): - # type: (bytes) -> bytes +def hidepassword(u: bytes) -> bytes: '''hide user credential in a url string''' u = url(u) if u.passwd: @@ -444,8 +441,7 @@ def hidepassword(u): return bytes(u) -def removeauth(u): - # type: (bytes) -> bytes +def removeauth(u: bytes) -> bytes: '''remove all authentication information from a url string''' u = url(u) u.user = u.passwd = None