- {date|age} |
+ {age(date)} |
{author|person} |
- {desc|strip|firstline|escape|nonempty}{inbranch%changelogbranchname}{branches%changelogbranchhead}{tags%changelogtag} |
+ {desc|strip|firstline|escape|nonempty}{inbranch%changelogbranchname}{branches%changelogbranchhead}{tags % '{name|escape} '} |
diff --git a/mercurial/transaction.py b/mercurial/transaction.py
--- a/mercurial/transaction.py
+++ b/mercurial/transaction.py
@@ -13,7 +13,7 @@
from i18n import _
import os, errno
-import error
+import error, util
def active(func):
def _active(self, *args, **kwds):
@@ -27,18 +27,22 @@ def _playback(journal, report, opener, e
for f, o, ignore in entries:
if o or not unlink:
try:
- opener(f, 'a').truncate(o)
+ fp = opener(f, 'a')
+ fp.truncate(o)
+ fp.close()
except IOError:
report(_("failed to truncate %s\n") % f)
raise
else:
try:
- fn = opener(f).name
- os.unlink(fn)
+ fp = opener(f)
+ fn = fp.name
+ fp.close()
+ util.unlink(fn)
except (IOError, OSError), inst:
if inst.errno != errno.ENOENT:
raise
- os.unlink(journal)
+ util.unlink(journal)
class transaction(object):
def __init__(self, report, opener, journal, after=None, createmode=None):
@@ -52,7 +56,7 @@ class transaction(object):
self.journal = journal
self._queue = []
- self.file = open(self.journal, "w")
+ self.file = util.posixfile(self.journal, "w")
if createmode is not None:
os.chmod(self.journal, createmode & 0666)
@@ -133,7 +137,7 @@ class transaction(object):
if self.after:
self.after()
if os.path.isfile(self.journal):
- os.unlink(self.journal)
+ util.unlink(self.journal)
self.journal = None
@active
@@ -151,7 +155,7 @@ class transaction(object):
try:
if not self.entries:
if self.journal:
- os.unlink(self.journal)
+ util.unlink(self.journal)
return
self.report(_("transaction abort!\n"))
@@ -169,7 +173,10 @@ class transaction(object):
def rollback(opener, file, report):
entries = []
- for l in open(file).readlines():
+ fp = util.posixfile(file)
+ lines = fp.readlines()
+ fp.close()
+ for l in lines:
f, o = l.split('\0')
entries.append((f, int(o), None))
diff --git a/mercurial/ui.py b/mercurial/ui.py
--- a/mercurial/ui.py
+++ b/mercurial/ui.py
@@ -153,6 +153,16 @@ class ui(object):
"%s.%s = %s\n") % (section, name, uvalue))
return value
+ def configpath(self, section, name, default=None, untrusted=False):
+ 'get a path config item, expanded relative to config file'
+ v = self.config(section, name, default, untrusted)
+ if not os.path.isabs(v) or "://" not in v:
+ src = self.configsource(section, name, untrusted)
+ if ':' in src:
+ base = os.path.dirname(src.rsplit(':'))
+ v = os.path.join(base, os.path.expanduser(v))
+ return v
+
def configbool(self, section, name, default=False, untrusted=False):
v = self.config(section, name, None, untrusted)
if v is None:
@@ -589,7 +599,7 @@ class ui(object):
termination.
'''
- if pos == None or not self.debugflag:
+ if pos is None or not self.debugflag:
return
if unit:
diff --git a/mercurial/url.py b/mercurial/url.py
--- a/mercurial/url.py
+++ b/mercurial/url.py
@@ -71,6 +71,38 @@ def netlocunsplit(host, port, user=None,
return userpass + '@' + hostport
return hostport
+def readauthforuri(ui, uri):
+ # Read configuration
+ config = dict()
+ for key, val in ui.configitems('auth'):
+ if '.' not in key:
+ ui.warn(_("ignoring invalid [auth] key '%s'\n") % key)
+ continue
+ group, setting = key.rsplit('.', 1)
+ gdict = config.setdefault(group, dict())
+ if setting in ('username', 'cert', 'key'):
+ val = util.expandpath(val)
+ gdict[setting] = val
+
+ # Find the best match
+ scheme, hostpath = uri.split('://', 1)
+ bestlen = 0
+ bestauth = None
+ for group, auth in config.iteritems():
+ prefix = auth.get('prefix')
+ if not prefix:
+ continue
+ p = prefix.split('://', 1)
+ if len(p) > 1:
+ schemes, prefix = [p[0]], p[1]
+ else:
+ schemes = (auth.get('schemes') or 'https').split()
+ if (prefix == '*' or hostpath.startswith(prefix)) and \
+ len(prefix) > bestlen and scheme in schemes:
+ bestlen = len(prefix)
+ bestauth = group, auth
+ return bestauth
+
_safe = ('abcdefghijklmnopqrstuvwxyz'
'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
'0123456789' '_.-/')
@@ -123,9 +155,11 @@ class passwordmgr(urllib2.HTTPPasswordMg
return (user, passwd)
if not user:
- auth = self.readauthtoken(authuri)
- if auth:
+ res = readauthforuri(self.ui, authuri)
+ if res:
+ group, auth = res
user, passwd = auth.get('username'), auth.get('password')
+ self.ui.debug("using auth.%s.* for authentication\n" % group)
if not user or not passwd:
if not self.ui.interactive():
raise util.Abort(_('http authorization required'))
@@ -148,38 +182,6 @@ class passwordmgr(urllib2.HTTPPasswordMg
msg = _('http auth: user %s, password %s\n')
self.ui.debug(msg % (user, passwd and '*' * len(passwd) or 'not set'))
- def readauthtoken(self, uri):
- # Read configuration
- config = dict()
- for key, val in self.ui.configitems('auth'):
- if '.' not in key:
- self.ui.warn(_("ignoring invalid [auth] key '%s'\n") % key)
- continue
- group, setting = key.split('.', 1)
- gdict = config.setdefault(group, dict())
- if setting in ('username', 'cert', 'key'):
- val = util.expandpath(val)
- gdict[setting] = val
-
- # Find the best match
- scheme, hostpath = uri.split('://', 1)
- bestlen = 0
- bestauth = None
- for auth in config.itervalues():
- prefix = auth.get('prefix')
- if not prefix:
- continue
- p = prefix.split('://', 1)
- if len(p) > 1:
- schemes, prefix = [p[0]], p[1]
- else:
- schemes = (auth.get('schemes') or 'https').split()
- if (prefix == '*' or hostpath.startswith(prefix)) and \
- len(prefix) > bestlen and scheme in schemes:
- bestlen = len(prefix)
- bestauth = auth
- return bestauth
-
class proxyhandler(urllib2.ProxyHandler):
def __init__(self, ui):
proxyurl = ui.config("http_proxy", "host") or os.getenv('http_proxy')
@@ -258,29 +260,47 @@ class httpsendfile(object):
defines a __len__ attribute to feed the Content-Length header.
"""
- def __init__(self, *args, **kwargs):
+ def __init__(self, ui, *args, **kwargs):
# We can't just "self._data = open(*args, **kwargs)" here because there
# is an "open" function defined in this module that shadows the global
# one
+ self.ui = ui
self._data = __builtin__.open(*args, **kwargs)
- self.read = self._data.read
self.seek = self._data.seek
self.close = self._data.close
self.write = self._data.write
+ self._len = os.fstat(self._data.fileno()).st_size
+ self._pos = 0
+ self._total = len(self) / 1024 * 2
+
+ def read(self, *args, **kwargs):
+ try:
+ ret = self._data.read(*args, **kwargs)
+ except EOFError:
+ self.ui.progress(_('sending'), None)
+ self._pos += len(ret)
+ # We pass double the max for total because we currently have
+ # to send the bundle twice in the case of a server that
+ # requires authentication. Since we can't know until we try
+ # once whether authentication will be required, just lie to
+ # the user and maybe the push succeeds suddenly at 50%.
+ self.ui.progress(_('sending'), self._pos / 1024,
+ unit=_('kb'), total=self._total)
+ return ret
def __len__(self):
- return os.fstat(self._data.fileno()).st_size
+ return self._len
-def _gen_sendfile(connection):
+def _gen_sendfile(orgsend):
def _sendfile(self, data):
# send a file
if isinstance(data, httpsendfile):
# if auth required, some data sent twice, so rewind here
data.seek(0)
for chunk in util.filechunkiter(data):
- connection.send(self, chunk)
+ orgsend(self, chunk)
else:
- connection.send(self, data)
+ orgsend(self, data)
return _sendfile
has_https = hasattr(urllib2, 'HTTPSHandler')
@@ -333,7 +353,7 @@ if has_https:
class httpconnection(keepalive.HTTPConnection):
# must be able to send big bundle as stream.
- send = _gen_sendfile(keepalive.HTTPConnection)
+ send = _gen_sendfile(keepalive.HTTPConnection.send)
def connect(self):
if has_https and self.realhostport: # use CONNECT proxy
@@ -522,32 +542,36 @@ def _verifycert(cert, hostname):
return _('no commonName or subjectAltName found in certificate')
if has_https:
- class BetterHTTPS(httplib.HTTPSConnection):
- send = keepalive.safesend
+ class httpsconnection(httplib.HTTPSConnection):
+ response_class = keepalive.HTTPResponse
+ # must be able to send big bundle as stream.
+ send = _gen_sendfile(keepalive.safesend)
+ getresponse = keepalive.wrapgetresponse(httplib.HTTPSConnection)
def connect(self):
- if hasattr(self, 'ui'):
- cacerts = self.ui.config('web', 'cacerts')
- if cacerts:
- cacerts = util.expandpath(cacerts)
- else:
- cacerts = None
+ self.sock = _create_connection((self.host, self.port))
+
+ host = self.host
+ if self.realhostport: # use CONNECT proxy
+ something = _generic_proxytunnel(self)
+ host = self.realhostport.rsplit(':', 1)[0]
- hostfingerprint = self.ui.config('hostfingerprints', self.host)
+ cacerts = self.ui.config('web', 'cacerts')
+ hostfingerprint = self.ui.config('hostfingerprints', host)
+
if cacerts and not hostfingerprint:
- sock = _create_connection((self.host, self.port))
- self.sock = _ssl_wrap_socket(sock, self.key_file,
- self.cert_file, cert_reqs=CERT_REQUIRED,
- ca_certs=cacerts)
- msg = _verifycert(self.sock.getpeercert(), self.host)
+ self.sock = _ssl_wrap_socket(self.sock, self.key_file,
+ self.cert_file, cert_reqs=CERT_REQUIRED,
+ ca_certs=util.expandpath(cacerts))
+ msg = _verifycert(self.sock.getpeercert(), host)
if msg:
raise util.Abort(_('%s certificate error: %s '
'(use --insecure to connect '
- 'insecurely)') % (self.host, msg))
- self.ui.debug('%s certificate successfully verified\n' %
- self.host)
+ 'insecurely)') % (host, msg))
+ self.ui.debug('%s certificate successfully verified\n' % host)
else:
- httplib.HTTPSConnection.connect(self)
+ self.sock = _ssl_wrap_socket(self.sock, self.key_file,
+ self.cert_file)
if hasattr(self.sock, 'getpeercert'):
peercert = self.sock.getpeercert(True)
peerfingerprint = util.sha1(peercert).hexdigest()
@@ -558,38 +582,22 @@ if has_https:
hostfingerprint.replace(':', '').lower():
raise util.Abort(_('invalid certificate for %s '
'with fingerprint %s') %
- (self.host, nicefingerprint))
+ (host, nicefingerprint))
self.ui.debug('%s certificate matched fingerprint %s\n' %
- (self.host, nicefingerprint))
+ (host, nicefingerprint))
else:
self.ui.warn(_('warning: %s certificate '
'with fingerprint %s not verified '
'(check hostfingerprints or web.cacerts '
'config setting)\n') %
- (self.host, nicefingerprint))
+ (host, nicefingerprint))
else: # python 2.5 ?
if hostfingerprint:
- raise util.Abort(_('no certificate for %s '
- 'with fingerprint') % self.host)
+ raise util.Abort(_('no certificate for %s with '
+ 'configured hostfingerprint') % host)
self.ui.warn(_('warning: %s certificate not verified '
'(check web.cacerts config setting)\n') %
- self.host)
-
- class httpsconnection(BetterHTTPS):
- response_class = keepalive.HTTPResponse
- # must be able to send big bundle as stream.
- send = _gen_sendfile(BetterHTTPS)
- getresponse = keepalive.wrapgetresponse(httplib.HTTPSConnection)
-
- def connect(self):
- if self.realhostport: # use CONNECT proxy
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.connect((self.host, self.port))
- if _generic_proxytunnel(self):
- self.sock = _ssl_wrap_socket(self.sock, self.key_file,
- self.cert_file)
- else:
- BetterHTTPS.connect(self)
+ host)
class httpshandler(keepalive.KeepAliveHandler, urllib2.HTTPSHandler):
def __init__(self, ui):
@@ -603,7 +611,13 @@ if has_https:
return keepalive.KeepAliveHandler._start_transaction(self, h, req)
def https_open(self, req):
- self.auth = self.pwmgr.readauthtoken(req.get_full_url())
+ res = readauthforuri(self.ui, req.get_full_url())
+ if res:
+ group, auth = res
+ self.auth = auth
+ self.ui.debug("using auth.%s.* for authentication\n" % group)
+ else:
+ self.auth = None
return self.do_open(self._makeconnection, req)
def _makeconnection(self, host, port=None, *args, **kwargs):
diff --git a/mercurial/util.py b/mercurial/util.py
--- a/mercurial/util.py
+++ b/mercurial/util.py
@@ -198,7 +198,10 @@ def tempfilter(s, cmd):
if code:
raise Abort(_("command '%s' failed: %s") %
(cmd, explain_exit(code)))
- return open(outname, 'rb').read()
+ fp = open(outname, 'rb')
+ r = fp.read()
+ fp.close()
+ return r
finally:
try:
if inname:
@@ -431,7 +434,7 @@ def checksignature(func):
return check
-def unlink(f):
+def unlinkpath(f):
"""unlink and remove the directory if it is empty"""
os.unlink(f)
# try removing directories that might now be empty
@@ -451,7 +454,7 @@ def copyfile(src, dest):
else:
try:
shutil.copyfile(src, dest)
- shutil.copystat(src, dest)
+ shutil.copymode(src, dest)
except shutil.Error, inst:
raise Abort(str(inst))
@@ -487,6 +490,7 @@ class path_auditor(object):
'''ensure that a filesystem path contains no banned components.
the following properties of a path are checked:
+ - ends with a directory separator
- under top-level .hg
- starts at the root of a windows drive
- contains ".."
@@ -504,6 +508,9 @@ class path_auditor(object):
def __call__(self, path):
if path in self.audited:
return
+ # AIX ignores "/" at end of path, others raise EISDIR.
+ if endswithsep(path):
+ raise Abort(_("path ends in directory separator: %s") % path)
normpath = os.path.normcase(path)
parts = splitpath(normpath)
if (os.path.splitdrive(path)[0]
@@ -550,16 +557,6 @@ class path_auditor(object):
# want to add "foo/bar/baz" before checking if there's a "foo/.hg"
self.auditeddir.update(prefixes)
-def nlinks(pathname):
- """Return number of hardlinks for the given file."""
- return os.lstat(pathname).st_nlink
-
-if hasattr(os, 'link'):
- os_link = os.link
-else:
- def os_link(src, dst):
- raise OSError(0, _("Hardlinks not supported"))
-
def lookup_reg(key, name=None, scope=None):
return None
@@ -597,7 +594,10 @@ def readlock(pathname):
raise
except AttributeError: # no symlink in os
pass
- return posixfile(pathname).read()
+ fp = posixfile(pathname)
+ r = fp.read()
+ fp.close()
+ return r
def fstat(fp):
'''stat file object that may not have fileno method.'''
@@ -738,7 +738,7 @@ def checknlink(testfile):
# nlinks() may behave differently for files on Windows shares if
# the file is open.
- fd = open(f2)
+ fd = posixfile(f2)
return nlinks(f2) > 1
finally:
if fd is not None:
@@ -837,7 +837,7 @@ class atomictempfile(object):
self._fp.close()
rename(self.temp, localpath(self.__name))
- def __del__(self):
+ def close(self):
if not self._fp:
return
if not self._fp.closed:
@@ -846,6 +846,9 @@ class atomictempfile(object):
except: pass
self._fp.close()
+ def __del__(self):
+ self.close()
+
def makedirs(name, mode=None):
"""recursive directory creation with parent mode inheritance"""
parent = os.path.abspath(os.path.dirname(name))
@@ -894,7 +897,6 @@ class opener(object):
mode += "b" # for that other OS
nlink = -1
- st_mode = None
dirname, basename = os.path.split(f)
# If basename is empty, then the path is malformed because it points
# to a directory. Let the posixfile() call below raise IOError.
@@ -905,18 +907,19 @@ class opener(object):
return atomictempfile(f, mode, self.createmode)
try:
if 'w' in mode:
- st_mode = os.lstat(f).st_mode & 0777
- os.unlink(f)
+ unlink(f)
nlink = 0
else:
# nlinks() may behave differently for files on Windows
# shares if the file is open.
- fd = open(f)
+ fd = posixfile(f)
nlink = nlinks(f)
if nlink < 1:
nlink = 2 # force mktempcopy (issue1922)
fd.close()
- except (OSError, IOError):
+ except (OSError, IOError), e:
+ if e.errno != errno.ENOENT:
+ raise
nlink = 0
if not os.path.isdir(dirname):
makedirs(dirname, self.createmode)
@@ -927,10 +930,7 @@ class opener(object):
rename(mktempcopy(f), f)
fp = posixfile(f, mode)
if nlink == 0:
- if st_mode is None:
- self._fixfilemode(f)
- else:
- os.chmod(f, st_mode)
+ self._fixfilemode(f)
return fp
def symlink(self, src, dst):
@@ -1075,7 +1075,7 @@ def strdate(string, format, defaults=[])
# NOTE: unixtime = localunixtime + offset
offset, date = timezone(string), string
- if offset != None:
+ if offset is not None:
date = " ".join(string.split()[:-1])
# add missing elements from defaults
@@ -1120,7 +1120,7 @@ def parsedate(date, formats=None, bias={
now = makedate()
defaults = {}
nowmap = {}
- for part in "d mb yY HI M S".split():
+ for part in ("d", "mb", "yY", "HI", "M", "S"):
# this piece is for rounding the specific end of unknowns
b = bias.get(part)
if b is None:
@@ -1190,7 +1190,7 @@ def matchdate(date):
def upper(date):
d = dict(mb="12", HI="23", M="59", S="59")
- for days in "31 30 29".split():
+ for days in ("31", "30", "29"):
try:
d["d"] = days
return parsedate(date, extendeddateformats, d)[0]
@@ -1387,37 +1387,48 @@ def uirepr(s):
# Avoid double backslash in Windows path repr()
return repr(s).replace('\\\\', '\\')
-#### naming convention of below implementation follows 'textwrap' module
+# delay import of textwrap
+def MBTextWrapper(**kwargs):
+ class tw(textwrap.TextWrapper):
+ """
+ Extend TextWrapper for double-width characters.
-class MBTextWrapper(textwrap.TextWrapper):
- def __init__(self, **kwargs):
- textwrap.TextWrapper.__init__(self, **kwargs)
+ Some Asian characters use two terminal columns instead of one.
+ A good example of this behavior can be seen with u'\u65e5\u672c',
+ the two Japanese characters for "Japan":
+ len() returns 2, but when printed to a terminal, they eat 4 columns.
+
+ (Note that this has nothing to do whatsoever with unicode
+ representation, or encoding of the underlying string)
+ """
+ def __init__(self, **kwargs):
+ textwrap.TextWrapper.__init__(self, **kwargs)
- def _cutdown(self, str, space_left):
- l = 0
- ucstr = unicode(str, encoding.encoding)
- w = unicodedata.east_asian_width
- for i in xrange(len(ucstr)):
- l += w(ucstr[i]) in 'WFA' and 2 or 1
- if space_left < l:
- return (ucstr[:i].encode(encoding.encoding),
- ucstr[i:].encode(encoding.encoding))
- return str, ''
+ def _cutdown(self, str, space_left):
+ l = 0
+ ucstr = unicode(str, encoding.encoding)
+ colwidth = unicodedata.east_asian_width
+ for i in xrange(len(ucstr)):
+ l += colwidth(ucstr[i]) in 'WFA' and 2 or 1
+ if space_left < l:
+ return (ucstr[:i].encode(encoding.encoding),
+ ucstr[i:].encode(encoding.encoding))
+ return str, ''
- # ----------------------------------------
- # overriding of base class
-
- def _handle_long_word(self, reversed_chunks, cur_line, cur_len, width):
- space_left = max(width - cur_len, 1)
+ # overriding of base class
+ def _handle_long_word(self, reversed_chunks, cur_line, cur_len, width):
+ space_left = max(width - cur_len, 1)
- if self.break_long_words:
- cut, res = self._cutdown(reversed_chunks[-1], space_left)
- cur_line.append(cut)
- reversed_chunks[-1] = res
- elif not cur_line:
- cur_line.append(reversed_chunks.pop())
+ if self.break_long_words:
+ cut, res = self._cutdown(reversed_chunks[-1], space_left)
+ cur_line.append(cut)
+ reversed_chunks[-1] = res
+ elif not cur_line:
+ cur_line.append(reversed_chunks.pop())
-#### naming convention of above implementation follows 'textwrap' module
+ global MBTextWrapper
+ MBTextWrapper = tw
+ return tw(**kwargs)
def wrap(line, width, initindent='', hangindent=''):
maxindent = max(len(hangindent), len(initindent))
@@ -1497,7 +1508,7 @@ except NameError:
return False
return True
-def interpolate(prefix, mapping, s, fn=None):
+def interpolate(prefix, mapping, s, fn=None, escape_prefix=False):
"""Return the result of interpolating items in the mapping into string s.
prefix is a single character string, or a two character string with
@@ -1506,9 +1517,20 @@ def interpolate(prefix, mapping, s, fn=N
fn is an optional function that will be applied to the replacement text
just before replacement.
+
+ escape_prefix is an optional flag that allows using doubled prefix for
+ its escaping.
"""
fn = fn or (lambda s: s)
- r = re.compile(r'%s(%s)' % (prefix, '|'.join(mapping.keys())))
+ patterns = '|'.join(mapping.keys())
+ if escape_prefix:
+ patterns += '|' + prefix
+ if len(prefix) > 1:
+ prefix_char = prefix[1:]
+ else:
+ prefix_char = prefix
+ mapping[prefix_char] = prefix_char
+ r = re.compile(r'%s(%s)' % (prefix, patterns))
return r.sub(lambda x: fn(mapping[x.group()[1:]]), s)
def getport(port):
diff --git a/mercurial/verify.py b/mercurial/verify.py
--- a/mercurial/verify.py
+++ b/mercurial/verify.py
@@ -34,7 +34,7 @@ def _verify(repo):
raise util.Abort(_("cannot verify bundle or remote repos"))
def err(linkrev, msg, filename=None):
- if linkrev != None:
+ if linkrev is not None:
badrevs.add(linkrev)
else:
linkrev = '?'
diff --git a/mercurial/win32.py b/mercurial/win32.py
--- a/mercurial/win32.py
+++ b/mercurial/win32.py
@@ -5,73 +5,173 @@
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
-"""Utility functions that use win32 API.
+import encoding
+import ctypes, errno, os, struct, subprocess
+
+_kernel32 = ctypes.windll.kernel32
+
+_BOOL = ctypes.c_long
+_WORD = ctypes.c_ushort
+_DWORD = ctypes.c_ulong
+_LPCSTR = _LPSTR = ctypes.c_char_p
+_HANDLE = ctypes.c_void_p
+_HWND = _HANDLE
+
+_INVALID_HANDLE_VALUE = -1
+
+# GetLastError
+_ERROR_SUCCESS = 0
+_ERROR_INVALID_PARAMETER = 87
+_ERROR_INSUFFICIENT_BUFFER = 122
+
+# WPARAM is defined as UINT_PTR (unsigned type)
+# LPARAM is defined as LONG_PTR (signed type)
+if ctypes.sizeof(ctypes.c_long) == ctypes.sizeof(ctypes.c_void_p):
+ _WPARAM = ctypes.c_ulong
+ _LPARAM = ctypes.c_long
+elif ctypes.sizeof(ctypes.c_longlong) == ctypes.sizeof(ctypes.c_void_p):
+ _WPARAM = ctypes.c_ulonglong
+ _LPARAM = ctypes.c_longlong
+
+class _FILETIME(ctypes.Structure):
+ _fields_ = [('dwLowDateTime', _DWORD),
+ ('dwHighDateTime', _DWORD)]
+
+class _BY_HANDLE_FILE_INFORMATION(ctypes.Structure):
+ _fields_ = [('dwFileAttributes', _DWORD),
+ ('ftCreationTime', _FILETIME),
+ ('ftLastAccessTime', _FILETIME),
+ ('ftLastWriteTime', _FILETIME),
+ ('dwVolumeSerialNumber', _DWORD),
+ ('nFileSizeHigh', _DWORD),
+ ('nFileSizeLow', _DWORD),
+ ('nNumberOfLinks', _DWORD),
+ ('nFileIndexHigh', _DWORD),
+ ('nFileIndexLow', _DWORD)]
+
+# CreateFile
+_FILE_SHARE_READ = 0x00000001
+_FILE_SHARE_WRITE = 0x00000002
+_FILE_SHARE_DELETE = 0x00000004
+
+_OPEN_EXISTING = 3
+
+# Process Security and Access Rights
+_PROCESS_QUERY_INFORMATION = 0x0400
+
+# GetExitCodeProcess
+_STILL_ACTIVE = 259
+
+# registry
+_HKEY_CURRENT_USER = 0x80000001L
+_HKEY_LOCAL_MACHINE = 0x80000002L
+_KEY_READ = 0x20019
+_REG_SZ = 1
+_REG_DWORD = 4
-Mark Hammond's win32all package allows better functionality on
-Windows. This module overrides definitions in util.py. If not
-available, import of this module will fail, and generic code will be
-used.
-"""
+class _STARTUPINFO(ctypes.Structure):
+ _fields_ = [('cb', _DWORD),
+ ('lpReserved', _LPSTR),
+ ('lpDesktop', _LPSTR),
+ ('lpTitle', _LPSTR),
+ ('dwX', _DWORD),
+ ('dwY', _DWORD),
+ ('dwXSize', _DWORD),
+ ('dwYSize', _DWORD),
+ ('dwXCountChars', _DWORD),
+ ('dwYCountChars', _DWORD),
+ ('dwFillAttribute', _DWORD),
+ ('dwFlags', _DWORD),
+ ('wShowWindow', _WORD),
+ ('cbReserved2', _WORD),
+ ('lpReserved2', ctypes.c_char_p),
+ ('hStdInput', _HANDLE),
+ ('hStdOutput', _HANDLE),
+ ('hStdError', _HANDLE)]
+
+class _PROCESS_INFORMATION(ctypes.Structure):
+ _fields_ = [('hProcess', _HANDLE),
+ ('hThread', _HANDLE),
+ ('dwProcessId', _DWORD),
+ ('dwThreadId', _DWORD)]
+
+_DETACHED_PROCESS = 0x00000008
+_STARTF_USESHOWWINDOW = 0x00000001
+_SW_HIDE = 0
-import win32api
+class _COORD(ctypes.Structure):
+ _fields_ = [('X', ctypes.c_short),
+ ('Y', ctypes.c_short)]
+
+class _SMALL_RECT(ctypes.Structure):
+ _fields_ = [('Left', ctypes.c_short),
+ ('Top', ctypes.c_short),
+ ('Right', ctypes.c_short),
+ ('Bottom', ctypes.c_short)]
+
+class _CONSOLE_SCREEN_BUFFER_INFO(ctypes.Structure):
+ _fields_ = [('dwSize', _COORD),
+ ('dwCursorPosition', _COORD),
+ ('wAttributes', _WORD),
+ ('srWindow', _SMALL_RECT),
+ ('dwMaximumWindowSize', _COORD)]
-import errno, os, sys, pywintypes, win32con, win32file, win32process
-import winerror, win32gui, win32console
-import osutil, encoding
-from win32com.shell import shell, shellcon
+_STD_ERROR_HANDLE = 0xfffffff4L # (DWORD)-12
+
+def _raiseoserror(name):
+ err = ctypes.WinError()
+ raise OSError(err.errno, '%s: %s' % (name, err.strerror))
+
+def _getfileinfo(name):
+ fh = _kernel32.CreateFileA(name, 0,
+ _FILE_SHARE_READ | _FILE_SHARE_WRITE | _FILE_SHARE_DELETE,
+ None, _OPEN_EXISTING, 0, None)
+ if fh == _INVALID_HANDLE_VALUE:
+ _raiseoserror(name)
+ try:
+ fi = _BY_HANDLE_FILE_INFORMATION()
+ if not _kernel32.GetFileInformationByHandle(fh, ctypes.byref(fi)):
+ _raiseoserror(name)
+ return fi
+ finally:
+ _kernel32.CloseHandle(fh)
def os_link(src, dst):
- try:
- win32file.CreateHardLink(dst, src)
- except pywintypes.error:
- raise OSError(errno.EINVAL, 'target implements hardlinks improperly')
- except NotImplementedError: # Another fake error win Win98
- raise OSError(errno.EINVAL, 'Hardlinking not supported')
+ if not _kernel32.CreateHardLinkA(dst, src, None):
+ _raiseoserror(src)
-def _getfileinfo(pathname):
- """Return number of hardlinks for the given file."""
- try:
- fh = win32file.CreateFile(pathname,
- win32file.GENERIC_READ, win32file.FILE_SHARE_READ,
- None, win32file.OPEN_EXISTING, 0, None)
- except pywintypes.error:
- raise OSError(errno.ENOENT, 'The system cannot find the file specified')
- try:
- return win32file.GetFileInformationByHandle(fh)
- finally:
- fh.Close()
-
-def nlinks(pathname):
- """Return number of hardlinks for the given file."""
- return _getfileinfo(pathname)[7]
+def nlinks(name):
+ '''return number of hardlinks for the given file'''
+ return _getfileinfo(name).nNumberOfLinks
def samefile(fpath1, fpath2):
- """Returns whether fpath1 and fpath2 refer to the same file. This is only
- guaranteed to work for files, not directories."""
+ '''Returns whether fpath1 and fpath2 refer to the same file. This is only
+ guaranteed to work for files, not directories.'''
res1 = _getfileinfo(fpath1)
res2 = _getfileinfo(fpath2)
- # Index 4 is the volume serial number, and 8 and 9 contain the file ID
- return res1[4] == res2[4] and res1[8] == res2[8] and res1[9] == res2[9]
+ return (res1.dwVolumeSerialNumber == res2.dwVolumeSerialNumber
+ and res1.nFileIndexHigh == res2.nFileIndexHigh
+ and res1.nFileIndexLow == res2.nFileIndexLow)
def samedevice(fpath1, fpath2):
- """Returns whether fpath1 and fpath2 are on the same device. This is only
- guaranteed to work for files, not directories."""
+ '''Returns whether fpath1 and fpath2 are on the same device. This is only
+ guaranteed to work for files, not directories.'''
res1 = _getfileinfo(fpath1)
res2 = _getfileinfo(fpath2)
- return res1[4] == res2[4]
+ return res1.dwVolumeSerialNumber == res2.dwVolumeSerialNumber
def testpid(pid):
'''return True if pid is still running or unable to
determine, False otherwise'''
- try:
- handle = win32api.OpenProcess(
- win32con.PROCESS_QUERY_INFORMATION, False, pid)
- if handle:
- status = win32process.GetExitCodeProcess(handle)
- return status == win32con.STILL_ACTIVE
- except pywintypes.error, details:
- return details[0] != winerror.ERROR_INVALID_PARAMETER
- return True
+ h = _kernel32.OpenProcess(_PROCESS_QUERY_INFORMATION, False, pid)
+ if h:
+ try:
+ status = _DWORD()
+ if _kernel32.GetExitCodeProcess(h, ctypes.byref(status)):
+ return status.value == _STILL_ACTIVE
+ finally:
+ _kernel32.CloseHandle(h)
+ return _kernel32.GetLastError() != _ERROR_INVALID_PARAMETER
def lookup_reg(key, valname=None, scope=None):
''' Look up a key/value name in the Windows registry.
@@ -82,101 +182,137 @@ def lookup_reg(key, valname=None, scope=
a sequence of scopes to look up in order. Default (CURRENT_USER,
LOCAL_MACHINE).
'''
- try:
- from _winreg import HKEY_CURRENT_USER, HKEY_LOCAL_MACHINE, \
- QueryValueEx, OpenKey
- except ImportError:
- return None
-
+ adv = ctypes.windll.advapi32
+ byref = ctypes.byref
if scope is None:
- scope = (HKEY_CURRENT_USER, HKEY_LOCAL_MACHINE)
+ scope = (_HKEY_CURRENT_USER, _HKEY_LOCAL_MACHINE)
elif not isinstance(scope, (list, tuple)):
scope = (scope,)
for s in scope:
+ kh = _HANDLE()
+ res = adv.RegOpenKeyExA(s, key, 0, _KEY_READ, ctypes.byref(kh))
+ if res != _ERROR_SUCCESS:
+ continue
try:
- val = QueryValueEx(OpenKey(s, key), valname)[0]
- # never let a Unicode string escape into the wild
- return encoding.tolocal(val.encode('UTF-8'))
- except EnvironmentError:
- pass
+ size = _DWORD(600)
+ type = _DWORD()
+ buf = ctypes.create_string_buffer(size.value + 1)
+ res = adv.RegQueryValueExA(kh.value, valname, None,
+ byref(type), buf, byref(size))
+ if res != _ERROR_SUCCESS:
+ continue
+ if type.value == _REG_SZ:
+ # never let a Unicode string escape into the wild
+ return encoding.tolocal(buf.value.encode('UTF-8'))
+ elif type.value == _REG_DWORD:
+ fmt = '