##// END OF EJS Templates
posix: do not use fstat in isowner...
Martin Geisler -
r8657:3fa92c61 default
parent child Browse files
Show More
@@ -1,219 +1,214 b''
1 1 # posix.py - Posix utility function implementations for Mercurial
2 2 #
3 3 # Copyright 2005-2009 Matt Mackall <mpm@selenic.com> and others
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2, incorporated herein by reference.
7 7
8 8 from i18n import _
9 9 import osutil
10 10 import os, sys, errno, stat, getpass, pwd, grp
11 11
12 12 posixfile = file
13 13 nulldev = '/dev/null'
14 14 normpath = os.path.normpath
15 15 samestat = os.path.samestat
16 16 expandglobs = False
17 17
18 18 umask = os.umask(0)
19 19 os.umask(umask)
20 20
21 21 def openhardlinks():
22 22 '''return true if it is safe to hold open file handles to hardlinks'''
23 23 return True
24 24
25 25 def rcfiles(path):
26 26 rcs = [os.path.join(path, 'hgrc')]
27 27 rcdir = os.path.join(path, 'hgrc.d')
28 28 try:
29 29 rcs.extend([os.path.join(rcdir, f)
30 30 for f, kind in osutil.listdir(rcdir)
31 31 if f.endswith(".rc")])
32 32 except OSError:
33 33 pass
34 34 return rcs
35 35
36 36 def system_rcpath():
37 37 path = []
38 38 # old mod_python does not set sys.argv
39 39 if len(getattr(sys, 'argv', [])) > 0:
40 40 path.extend(rcfiles(os.path.dirname(sys.argv[0]) +
41 41 '/../etc/mercurial'))
42 42 path.extend(rcfiles('/etc/mercurial'))
43 43 return path
44 44
45 45 def user_rcpath():
46 46 return [os.path.expanduser('~/.hgrc')]
47 47
48 48 def parse_patch_output(output_line):
49 49 """parses the output produced by patch and returns the file name"""
50 50 pf = output_line[14:]
51 51 if os.sys.platform == 'OpenVMS':
52 52 if pf[0] == '`':
53 53 pf = pf[1:-1] # Remove the quotes
54 54 else:
55 55 if pf.startswith("'") and pf.endswith("'") and " " in pf:
56 56 pf = pf[1:-1] # Remove the quotes
57 57 return pf
58 58
59 59 def sshargs(sshcmd, host, user, port):
60 60 '''Build argument list for ssh'''
61 61 args = user and ("%s@%s" % (user, host)) or host
62 62 return port and ("%s -p %s" % (args, port)) or args
63 63
64 64 def is_exec(f):
65 65 """check whether a file is executable"""
66 66 return (os.lstat(f).st_mode & 0100 != 0)
67 67
68 68 def set_flags(f, l, x):
69 69 s = os.lstat(f).st_mode
70 70 if l:
71 71 if not stat.S_ISLNK(s):
72 72 # switch file to link
73 73 data = file(f).read()
74 74 os.unlink(f)
75 75 try:
76 76 os.symlink(data, f)
77 77 except:
78 78 # failed to make a link, rewrite file
79 79 file(f, "w").write(data)
80 80 # no chmod needed at this point
81 81 return
82 82 if stat.S_ISLNK(s):
83 83 # switch link to file
84 84 data = os.readlink(f)
85 85 os.unlink(f)
86 86 file(f, "w").write(data)
87 87 s = 0666 & ~umask # avoid restatting for chmod
88 88
89 89 sx = s & 0100
90 90 if x and not sx:
91 91 # Turn on +x for every +r bit when making a file executable
92 92 # and obey umask.
93 93 os.chmod(f, s | (s & 0444) >> 2 & ~umask)
94 94 elif not x and sx:
95 95 # Turn off all +x bits
96 96 os.chmod(f, s & 0666)
97 97
98 98 def set_binary(fd):
99 99 pass
100 100
101 101 def pconvert(path):
102 102 return path
103 103
104 104 def localpath(path):
105 105 return path
106 106
107 107 def shellquote(s):
108 108 if os.sys.platform == 'OpenVMS':
109 109 return '"%s"' % s
110 110 else:
111 111 return "'%s'" % s.replace("'", "'\\''")
112 112
113 113 def quotecommand(cmd):
114 114 return cmd
115 115
116 116 def popen(command, mode='r'):
117 117 return os.popen(command, mode)
118 118
119 119 def testpid(pid):
120 120 '''return False if pid dead, True if running or not sure'''
121 121 if os.sys.platform == 'OpenVMS':
122 122 return True
123 123 try:
124 124 os.kill(pid, 0)
125 125 return True
126 126 except OSError, inst:
127 127 return inst.errno != errno.ESRCH
128 128
129 129 def explain_exit(code):
130 130 """return a 2-tuple (desc, code) describing a process's status"""
131 131 if os.WIFEXITED(code):
132 132 val = os.WEXITSTATUS(code)
133 133 return _("exited with status %d") % val, val
134 134 elif os.WIFSIGNALED(code):
135 135 val = os.WTERMSIG(code)
136 136 return _("killed by signal %d") % val, val
137 137 elif os.WIFSTOPPED(code):
138 138 val = os.WSTOPSIG(code)
139 139 return _("stopped by signal %d") % val, val
140 140 raise ValueError(_("invalid exit code"))
141 141
142 def isowner(fp, st=None):
143 """Return True if the file object f belongs to the current user.
144
145 The return value of a util.fstat(f) may be passed as the st argument.
146 """
147 if st is None:
148 st = fstat(fp)
142 def isowner(st):
143 """Return True if the stat object st is from the current user."""
149 144 return st.st_uid == os.getuid()
150 145
151 146 def find_exe(command):
152 147 '''Find executable for command searching like which does.
153 148 If command is a basename then PATH is searched for command.
154 149 PATH isn't searched if command is an absolute or relative path.
155 150 If command isn't found None is returned.'''
156 151 if sys.platform == 'OpenVMS':
157 152 return command
158 153
159 154 def findexisting(executable):
160 155 'Will return executable if existing file'
161 156 if os.path.exists(executable):
162 157 return executable
163 158 return None
164 159
165 160 if os.sep in command:
166 161 return findexisting(command)
167 162
168 163 for path in os.environ.get('PATH', '').split(os.pathsep):
169 164 executable = findexisting(os.path.join(path, command))
170 165 if executable is not None:
171 166 return executable
172 167 return None
173 168
174 169 def set_signal_handler():
175 170 pass
176 171
177 172 def statfiles(files):
178 173 'Stat each file in files and yield stat or None if file does not exist.'
179 174 lstat = os.lstat
180 175 for nf in files:
181 176 try:
182 177 st = lstat(nf)
183 178 except OSError, err:
184 179 if err.errno not in (errno.ENOENT, errno.ENOTDIR):
185 180 raise
186 181 st = None
187 182 yield st
188 183
189 184 def getuser():
190 185 '''return name of current user'''
191 186 return getpass.getuser()
192 187
193 188 def expand_glob(pats):
194 189 '''On Windows, expand the implicit globs in a list of patterns'''
195 190 return list(pats)
196 191
197 192 def username(uid=None):
198 193 """Return the name of the user with the given uid.
199 194
200 195 If uid is None, return the name of the current user."""
201 196
202 197 if uid is None:
203 198 uid = os.getuid()
204 199 try:
205 200 return pwd.getpwuid(uid)[0]
206 201 except KeyError:
207 202 return str(uid)
208 203
209 204 def groupname(gid=None):
210 205 """Return the name of the group with the given gid.
211 206
212 207 If gid is None, return the name of the current group."""
213 208
214 209 if gid is None:
215 210 gid = os.getgid()
216 211 try:
217 212 return grp.getgrgid(gid)[0]
218 213 except KeyError:
219 214 return str(gid)
@@ -1,346 +1,346 b''
1 1 # ui.py - user interface bits for mercurial
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2, incorporated herein by reference.
7 7
8 8 from i18n import _
9 9 import errno, getpass, os, socket, sys, tempfile, traceback
10 10 import config, util, error
11 11
12 12 _booleans = {'1': True, 'yes': True, 'true': True, 'on': True,
13 13 '0': False, 'no': False, 'false': False, 'off': False}
14 14
15 15 class ui(object):
16 16 def __init__(self, src=None):
17 17 self._buffers = []
18 18 self.quiet = self.verbose = self.debugflag = self._traceback = False
19 19 self._reportuntrusted = True
20 20 self._ocfg = config.config() # overlay
21 21 self._tcfg = config.config() # trusted
22 22 self._ucfg = config.config() # untrusted
23 23 self._trustusers = set()
24 24 self._trustgroups = set()
25 25
26 26 if src:
27 27 self._tcfg = src._tcfg.copy()
28 28 self._ucfg = src._ucfg.copy()
29 29 self._ocfg = src._ocfg.copy()
30 30 self._trustusers = src._trustusers.copy()
31 31 self._trustgroups = src._trustgroups.copy()
32 32 self.fixconfig()
33 33 else:
34 34 # we always trust global config files
35 35 for f in util.rcpath():
36 36 self.readconfig(f, trust=True)
37 37
38 38 def copy(self):
39 39 return self.__class__(self)
40 40
41 41 def _is_trusted(self, fp, f):
42 42 st = util.fstat(fp)
43 if util.isowner(fp, st):
43 if util.isowner(st):
44 44 return True
45 45
46 46 tusers, tgroups = self._trustusers, self._trustgroups
47 47 if '*' in tusers or '*' in tgroups:
48 48 return True
49 49
50 50 user = util.username(st.st_uid)
51 51 group = util.groupname(st.st_gid)
52 52 if user in tusers or group in tgroups or user == util.username():
53 53 return True
54 54
55 55 if self._reportuntrusted:
56 56 self.warn(_('Not trusting file %s from untrusted '
57 57 'user %s, group %s\n') % (f, user, group))
58 58 return False
59 59
60 60 def readconfig(self, filename, root=None, trust=False,
61 61 sections=None, remap=None):
62 62 try:
63 63 fp = open(filename)
64 64 except IOError:
65 65 if not sections: # ignore unless we were looking for something
66 66 return
67 67 raise
68 68
69 69 cfg = config.config()
70 70 trusted = sections or trust or self._is_trusted(fp, filename)
71 71
72 72 try:
73 73 cfg.read(filename, fp, sections=sections, remap=remap)
74 74 except error.ConfigError, inst:
75 75 if trusted:
76 76 raise
77 77 self.warn(_("Ignored: %s\n") % str(inst))
78 78
79 79 if trusted:
80 80 self._tcfg.update(cfg)
81 81 self._tcfg.update(self._ocfg)
82 82 self._ucfg.update(cfg)
83 83 self._ucfg.update(self._ocfg)
84 84
85 85 if root is None:
86 86 root = os.path.expanduser('~')
87 87 self.fixconfig(root=root)
88 88
89 89 def fixconfig(self, root=None):
90 90 # translate paths relative to root (or home) into absolute paths
91 91 root = root or os.getcwd()
92 92 for c in self._tcfg, self._ucfg, self._ocfg:
93 93 for n, p in c.items('paths'):
94 94 if p and "://" not in p and not os.path.isabs(p):
95 95 c.set("paths", n, os.path.normpath(os.path.join(root, p)))
96 96
97 97 # update ui options
98 98 self.debugflag = self.configbool('ui', 'debug')
99 99 self.verbose = self.debugflag or self.configbool('ui', 'verbose')
100 100 self.quiet = not self.debugflag and self.configbool('ui', 'quiet')
101 101 if self.verbose and self.quiet:
102 102 self.quiet = self.verbose = False
103 103 self._reportuntrusted = self.configbool("ui", "report_untrusted", True)
104 104 self._traceback = self.configbool('ui', 'traceback', False)
105 105
106 106 # update trust information
107 107 self._trustusers.update(self.configlist('trusted', 'users'))
108 108 self._trustgroups.update(self.configlist('trusted', 'groups'))
109 109
110 110 def setconfig(self, section, name, value):
111 111 for cfg in (self._ocfg, self._tcfg, self._ucfg):
112 112 cfg.set(section, name, value)
113 113 self.fixconfig()
114 114
115 115 def _data(self, untrusted):
116 116 return untrusted and self._ucfg or self._tcfg
117 117
118 118 def configsource(self, section, name, untrusted=False):
119 119 return self._data(untrusted).source(section, name) or 'none'
120 120
121 121 def config(self, section, name, default=None, untrusted=False):
122 122 value = self._data(untrusted).get(section, name, default)
123 123 if self.debugflag and not untrusted and self._reportuntrusted:
124 124 uvalue = self._ucfg.get(section, name)
125 125 if uvalue is not None and uvalue != value:
126 126 self.debug(_("ignoring untrusted configuration option "
127 127 "%s.%s = %s\n") % (section, name, uvalue))
128 128 return value
129 129
130 130 def configbool(self, section, name, default=False, untrusted=False):
131 131 v = self.config(section, name, None, untrusted)
132 132 if v is None:
133 133 return default
134 134 if v.lower() not in _booleans:
135 135 raise error.ConfigError(_("%s.%s not a boolean ('%s')")
136 136 % (section, name, v))
137 137 return _booleans[v.lower()]
138 138
139 139 def configlist(self, section, name, default=None, untrusted=False):
140 140 """Return a list of comma/space separated strings"""
141 141 result = self.config(section, name, untrusted=untrusted)
142 142 if result is None:
143 143 result = default or []
144 144 if isinstance(result, basestring):
145 145 result = result.replace(",", " ").split()
146 146 return result
147 147
148 148 def has_section(self, section, untrusted=False):
149 149 '''tell whether section exists in config.'''
150 150 return section in self._data(untrusted)
151 151
152 152 def configitems(self, section, untrusted=False):
153 153 items = self._data(untrusted).items(section)
154 154 if self.debugflag and not untrusted and self._reportuntrusted:
155 155 for k, v in self._ucfg.items(section):
156 156 if self._tcfg.get(section, k) != v:
157 157 self.debug(_("ignoring untrusted configuration option "
158 158 "%s.%s = %s\n") % (section, k, v))
159 159 return items
160 160
161 161 def walkconfig(self, untrusted=False):
162 162 cfg = self._data(untrusted)
163 163 for section in cfg.sections():
164 164 for name, value in self.configitems(section, untrusted):
165 165 yield section, name, str(value).replace('\n', '\\n')
166 166
167 167 def username(self):
168 168 """Return default username to be used in commits.
169 169
170 170 Searched in this order: $HGUSER, [ui] section of hgrcs, $EMAIL
171 171 and stop searching if one of these is set.
172 172 If not found and ui.askusername is True, ask the user, else use
173 173 ($LOGNAME or $USER or $LNAME or $USERNAME) + "@full.hostname".
174 174 """
175 175 user = os.environ.get("HGUSER")
176 176 if user is None:
177 177 user = self.config("ui", "username")
178 178 if user is None:
179 179 user = os.environ.get("EMAIL")
180 180 if user is None and self.configbool("ui", "askusername"):
181 181 user = self.prompt(_("enter a commit username:"), default=None)
182 182 if user is None:
183 183 try:
184 184 user = '%s@%s' % (util.getuser(), socket.getfqdn())
185 185 self.warn(_("No username found, using '%s' instead\n") % user)
186 186 except KeyError:
187 187 pass
188 188 if not user:
189 189 raise util.Abort(_("Please specify a username."))
190 190 if "\n" in user:
191 191 raise util.Abort(_("username %s contains a newline\n") % repr(user))
192 192 return user
193 193
194 194 def shortuser(self, user):
195 195 """Return a short representation of a user name or email address."""
196 196 if not self.verbose: user = util.shortuser(user)
197 197 return user
198 198
199 199 def _path(self, loc):
200 200 p = self.config('paths', loc)
201 201 if p and '%%' in p:
202 202 self.warn('(deprecated \'%%\' in path %s=%s from %s)\n' %
203 203 (loc, p, self.configsource('paths', loc)))
204 204 p = p.replace('%%', '%')
205 205 return p
206 206
207 207 def expandpath(self, loc, default=None):
208 208 """Return repository location relative to cwd or from [paths]"""
209 209 if "://" in loc or os.path.isdir(os.path.join(loc, '.hg')):
210 210 return loc
211 211
212 212 path = self._path(loc)
213 213 if not path and default is not None:
214 214 path = self._path(default)
215 215 return path or loc
216 216
217 217 def pushbuffer(self):
218 218 self._buffers.append([])
219 219
220 220 def popbuffer(self):
221 221 return "".join(self._buffers.pop())
222 222
223 223 def write(self, *args):
224 224 if self._buffers:
225 225 self._buffers[-1].extend([str(a) for a in args])
226 226 else:
227 227 for a in args:
228 228 sys.stdout.write(str(a))
229 229
230 230 def write_err(self, *args):
231 231 try:
232 232 if not sys.stdout.closed: sys.stdout.flush()
233 233 for a in args:
234 234 sys.stderr.write(str(a))
235 235 # stderr may be buffered under win32 when redirected to files,
236 236 # including stdout.
237 237 if not sys.stderr.closed: sys.stderr.flush()
238 238 except IOError, inst:
239 239 if inst.errno != errno.EPIPE:
240 240 raise
241 241
242 242 def flush(self):
243 243 try: sys.stdout.flush()
244 244 except: pass
245 245 try: sys.stderr.flush()
246 246 except: pass
247 247
248 248 def interactive(self):
249 249 i = self.configbool("ui", "interactive", None)
250 250 if i is None:
251 251 return sys.stdin.isatty()
252 252 return i
253 253
254 254 def _readline(self, prompt=''):
255 255 if sys.stdin.isatty():
256 256 try:
257 257 # magically add command line editing support, where
258 258 # available
259 259 import readline
260 260 # force demandimport to really load the module
261 261 readline.read_history_file
262 262 # windows sometimes raises something other than ImportError
263 263 except Exception:
264 264 pass
265 265 line = raw_input(prompt)
266 266 # When stdin is in binary mode on Windows, it can cause
267 267 # raw_input() to emit an extra trailing carriage return
268 268 if os.linesep == '\r\n' and line and line[-1] == '\r':
269 269 line = line[:-1]
270 270 return line
271 271
272 272 def prompt(self, msg, choices=None, default="y"):
273 273 """Prompt user with msg, read response, and ensure it matches
274 274 one of the provided choices. choices is a sequence of acceptable
275 275 responses with the format: ('&None', 'E&xec', 'Sym&link')
276 276 No sequence implies no response checking. Responses are case
277 277 insensitive. If ui is not interactive, the default is returned.
278 278 """
279 279 if not self.interactive():
280 280 self.note(msg, ' ', default, "\n")
281 281 return default
282 282 while True:
283 283 try:
284 284 r = self._readline(msg + ' ')
285 285 if not r:
286 286 return default
287 287 if not choices:
288 288 return r
289 289 resps = [s[s.index('&')+1].lower() for s in choices]
290 290 if r.lower() in resps:
291 291 return r.lower()
292 292 else:
293 293 self.write(_("unrecognized response\n"))
294 294 except EOFError:
295 295 raise util.Abort(_('response expected'))
296 296
297 297 def getpass(self, prompt=None, default=None):
298 298 if not self.interactive(): return default
299 299 try:
300 300 return getpass.getpass(prompt or _('password: '))
301 301 except EOFError:
302 302 raise util.Abort(_('response expected'))
303 303 def status(self, *msg):
304 304 if not self.quiet: self.write(*msg)
305 305 def warn(self, *msg):
306 306 self.write_err(*msg)
307 307 def note(self, *msg):
308 308 if self.verbose: self.write(*msg)
309 309 def debug(self, *msg):
310 310 if self.debugflag: self.write(*msg)
311 311 def edit(self, text, user):
312 312 (fd, name) = tempfile.mkstemp(prefix="hg-editor-", suffix=".txt",
313 313 text=True)
314 314 try:
315 315 f = os.fdopen(fd, "w")
316 316 f.write(text)
317 317 f.close()
318 318
319 319 editor = self.geteditor()
320 320
321 321 util.system("%s \"%s\"" % (editor, name),
322 322 environ={'HGUSER': user},
323 323 onerr=util.Abort, errprefix=_("edit failed"))
324 324
325 325 f = open(name)
326 326 t = f.read()
327 327 f.close()
328 328 finally:
329 329 os.unlink(name)
330 330
331 331 return t
332 332
333 333 def traceback(self):
334 334 '''print exception traceback if traceback printing enabled.
335 335 only to call in exception handler. returns true if traceback
336 336 printed.'''
337 337 if self._traceback:
338 338 traceback.print_exc()
339 339 return self._traceback
340 340
341 341 def geteditor(self):
342 342 '''return editor to use'''
343 343 return (os.environ.get("HGEDITOR") or
344 344 self.config("ui", "editor") or
345 345 os.environ.get("VISUAL") or
346 346 os.environ.get("EDITOR", "vi"))
@@ -1,284 +1,284 b''
1 1 # windows.py - Windows utility function implementations for Mercurial
2 2 #
3 3 # Copyright 2005-2009 Matt Mackall <mpm@selenic.com> and others
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2, incorporated herein by reference.
7 7
8 8 from i18n import _
9 9 import osutil, error
10 10 import errno, msvcrt, os, re, sys
11 11
12 12 nulldev = 'NUL:'
13 13 umask = 002
14 14
15 15 # wrap osutil.posixfile to provide friendlier exceptions
16 16 def posixfile(name, mode='r', buffering=-1):
17 17 try:
18 18 return osutil.posixfile(name, mode, buffering)
19 19 except WindowsError, err:
20 20 raise WinIOError(err)
21 21 posixfile.__doc__ = osutil.posixfile.__doc__
22 22
23 23 class winstdout:
24 24 '''stdout on windows misbehaves if sent through a pipe'''
25 25
26 26 def __init__(self, fp):
27 27 self.fp = fp
28 28
29 29 def __getattr__(self, key):
30 30 return getattr(self.fp, key)
31 31
32 32 def close(self):
33 33 try:
34 34 self.fp.close()
35 35 except: pass
36 36
37 37 def write(self, s):
38 38 try:
39 39 # This is workaround for "Not enough space" error on
40 40 # writing large size of data to console.
41 41 limit = 16000
42 42 l = len(s)
43 43 start = 0
44 44 while start < l:
45 45 end = start + limit
46 46 self.fp.write(s[start:end])
47 47 start = end
48 48 except IOError, inst:
49 49 if inst.errno != 0: raise
50 50 self.close()
51 51 raise IOError(errno.EPIPE, 'Broken pipe')
52 52
53 53 def flush(self):
54 54 try:
55 55 return self.fp.flush()
56 56 except IOError, inst:
57 57 if inst.errno != errno.EINVAL: raise
58 58 self.close()
59 59 raise IOError(errno.EPIPE, 'Broken pipe')
60 60
61 61 sys.stdout = winstdout(sys.stdout)
62 62
63 63 def _is_win_9x():
64 64 '''return true if run on windows 95, 98 or me.'''
65 65 try:
66 66 return sys.getwindowsversion()[3] == 1
67 67 except AttributeError:
68 68 return 'command' in os.environ.get('comspec', '')
69 69
70 70 def openhardlinks():
71 71 return not _is_win_9x() and "win32api" in globals()
72 72
73 73 def system_rcpath():
74 74 try:
75 75 return system_rcpath_win32()
76 76 except:
77 77 return [r'c:\mercurial\mercurial.ini']
78 78
79 79 def user_rcpath():
80 80 '''return os-specific hgrc search path to the user dir'''
81 81 try:
82 82 path = user_rcpath_win32()
83 83 except:
84 84 home = os.path.expanduser('~')
85 85 path = [os.path.join(home, 'mercurial.ini'),
86 86 os.path.join(home, '.hgrc')]
87 87 userprofile = os.environ.get('USERPROFILE')
88 88 if userprofile:
89 89 path.append(os.path.join(userprofile, 'mercurial.ini'))
90 90 path.append(os.path.join(userprofile, '.hgrc'))
91 91 return path
92 92
93 93 def parse_patch_output(output_line):
94 94 """parses the output produced by patch and returns the file name"""
95 95 pf = output_line[14:]
96 96 if pf[0] == '`':
97 97 pf = pf[1:-1] # Remove the quotes
98 98 return pf
99 99
100 100 def sshargs(sshcmd, host, user, port):
101 101 '''Build argument list for ssh or Plink'''
102 102 pflag = 'plink' in sshcmd.lower() and '-P' or '-p'
103 103 args = user and ("%s@%s" % (user, host)) or host
104 104 return port and ("%s %s %s" % (args, pflag, port)) or args
105 105
106 106 def testpid(pid):
107 107 '''return False if pid dead, True if running or not known'''
108 108 return True
109 109
110 110 def set_flags(f, l, x):
111 111 pass
112 112
113 113 def set_binary(fd):
114 114 # When run without console, pipes may expose invalid
115 115 # fileno(), usually set to -1.
116 116 if hasattr(fd, 'fileno') and fd.fileno() >= 0:
117 117 msvcrt.setmode(fd.fileno(), os.O_BINARY)
118 118
119 119 def pconvert(path):
120 120 return '/'.join(path.split(os.sep))
121 121
122 122 def localpath(path):
123 123 return path.replace('/', '\\')
124 124
125 125 def normpath(path):
126 126 return pconvert(os.path.normpath(path))
127 127
128 128 def samestat(s1, s2):
129 129 return False
130 130
131 131 # A sequence of backslashes is special iff it precedes a double quote:
132 132 # - if there's an even number of backslashes, the double quote is not
133 133 # quoted (i.e. it ends the quoted region)
134 134 # - if there's an odd number of backslashes, the double quote is quoted
135 135 # - in both cases, every pair of backslashes is unquoted into a single
136 136 # backslash
137 137 # (See http://msdn2.microsoft.com/en-us/library/a1y7w461.aspx )
138 138 # So, to quote a string, we must surround it in double quotes, double
139 139 # the number of backslashes that preceed double quotes and add another
140 140 # backslash before every double quote (being careful with the double
141 141 # quote we've appended to the end)
142 142 _quotere = None
143 143 def shellquote(s):
144 144 global _quotere
145 145 if _quotere is None:
146 146 _quotere = re.compile(r'(\\*)("|\\$)')
147 147 return '"%s"' % _quotere.sub(r'\1\1\\\2', s)
148 148
149 149 def quotecommand(cmd):
150 150 """Build a command string suitable for os.popen* calls."""
151 151 # The extra quotes are needed because popen* runs the command
152 152 # through the current COMSPEC. cmd.exe suppress enclosing quotes.
153 153 return '"' + cmd + '"'
154 154
155 155 def popen(command, mode='r'):
156 156 # Work around "popen spawned process may not write to stdout
157 157 # under windows"
158 158 # http://bugs.python.org/issue1366
159 159 command += " 2> %s" % nulldev
160 160 return os.popen(quotecommand(command), mode)
161 161
162 162 def explain_exit(code):
163 163 return _("exited with status %d") % code, code
164 164
165 165 # if you change this stub into a real check, please try to implement the
166 166 # username and groupname functions above, too.
167 def isowner(fp, st=None):
167 def isowner(st):
168 168 return True
169 169
170 170 def find_exe(command):
171 171 '''Find executable for command searching like cmd.exe does.
172 172 If command is a basename then PATH is searched for command.
173 173 PATH isn't searched if command is an absolute or relative path.
174 174 An extension from PATHEXT is found and added if not present.
175 175 If command isn't found None is returned.'''
176 176 pathext = os.environ.get('PATHEXT', '.COM;.EXE;.BAT;.CMD')
177 177 pathexts = [ext for ext in pathext.lower().split(os.pathsep)]
178 178 if os.path.splitext(command)[1].lower() in pathexts:
179 179 pathexts = ['']
180 180
181 181 def findexisting(pathcommand):
182 182 'Will append extension (if needed) and return existing file'
183 183 for ext in pathexts:
184 184 executable = pathcommand + ext
185 185 if os.path.exists(executable):
186 186 return executable
187 187 return None
188 188
189 189 if os.sep in command:
190 190 return findexisting(command)
191 191
192 192 for path in os.environ.get('PATH', '').split(os.pathsep):
193 193 executable = findexisting(os.path.join(path, command))
194 194 if executable is not None:
195 195 return executable
196 196 return None
197 197
198 198 def set_signal_handler():
199 199 try:
200 200 set_signal_handler_win32()
201 201 except NameError:
202 202 pass
203 203
204 204 def statfiles(files):
205 205 '''Stat each file in files and yield stat or None if file does not exist.
206 206 Cluster and cache stat per directory to minimize number of OS stat calls.'''
207 207 ncase = os.path.normcase
208 208 sep = os.sep
209 209 dircache = {} # dirname -> filename -> status | None if file does not exist
210 210 for nf in files:
211 211 nf = ncase(nf)
212 212 pos = nf.rfind(sep)
213 213 if pos == -1:
214 214 dir, base = '.', nf
215 215 else:
216 216 dir, base = nf[:pos+1], nf[pos+1:]
217 217 cache = dircache.get(dir, None)
218 218 if cache is None:
219 219 try:
220 220 dmap = dict([(ncase(n), s)
221 221 for n, k, s in osutil.listdir(dir, True)])
222 222 except OSError, err:
223 223 # handle directory not found in Python version prior to 2.5
224 224 # Python <= 2.4 returns native Windows code 3 in errno
225 225 # Python >= 2.5 returns ENOENT and adds winerror field
226 226 # EINVAL is raised if dir is not a directory.
227 227 if err.errno not in (3, errno.ENOENT, errno.EINVAL,
228 228 errno.ENOTDIR):
229 229 raise
230 230 dmap = {}
231 231 cache = dircache.setdefault(dir, dmap)
232 232 yield cache.get(base, None)
233 233
234 234 def getuser():
235 235 '''return name of current user'''
236 236 raise error.Abort(_('user name not available - set USERNAME '
237 237 'environment variable'))
238 238
239 239 def username(uid=None):
240 240 """Return the name of the user with the given uid.
241 241
242 242 If uid is None, return the name of the current user."""
243 243 return None
244 244
245 245 def groupname(gid=None):
246 246 """Return the name of the group with the given gid.
247 247
248 248 If gid is None, return the name of the current group."""
249 249 return None
250 250
251 251 def _removedirs(name):
252 252 """special version of os.removedirs that does not remove symlinked
253 253 directories or junction points if they actually contain files"""
254 254 if osutil.listdir(name):
255 255 return
256 256 os.rmdir(name)
257 257 head, tail = os.path.split(name)
258 258 if not tail:
259 259 head, tail = os.path.split(head)
260 260 while head and tail:
261 261 try:
262 262 if osutil.listdir(name):
263 263 return
264 264 os.rmdir(head)
265 265 except:
266 266 break
267 267 head, tail = os.path.split(head)
268 268
269 269 def unlink(f):
270 270 """unlink and remove the directory if it is empty"""
271 271 os.unlink(f)
272 272 # try removing directories that might now be empty
273 273 try:
274 274 _removedirs(os.path.dirname(f))
275 275 except OSError:
276 276 pass
277 277
278 278 try:
279 279 # override functions with win32 versions if possible
280 280 from win32 import *
281 281 except ImportError:
282 282 pass
283 283
284 284 expandglobs = True
@@ -1,189 +1,189 b''
1 1 # Since it's not easy to write a test that portably deals
2 2 # with files from different users/groups, we cheat a bit by
3 3 # monkey-patching some functions in the util module
4 4
5 5 import os
6 6 from mercurial import ui, util, error
7 7
8 8 hgrc = os.environ['HGRCPATH']
9 9 f = open(hgrc)
10 10 basehgrc = f.read()
11 11 f.close()
12 12
13 13 def testui(user='foo', group='bar', tusers=(), tgroups=(),
14 14 cuser='foo', cgroup='bar', debug=False, silent=False):
15 15 # user, group => owners of the file
16 16 # tusers, tgroups => trusted users/groups
17 17 # cuser, cgroup => user/group of the current process
18 18
19 19 # write a global hgrc with the list of trusted users/groups and
20 20 # some setting so that we can be sure it was read
21 21 f = open(hgrc, 'w')
22 22 f.write(basehgrc)
23 23 f.write('\n[paths]\n')
24 24 f.write('global = /some/path\n\n')
25 25
26 26 if tusers or tgroups:
27 27 f.write('[trusted]\n')
28 28 if tusers:
29 29 f.write('users = %s\n' % ', '.join(tusers))
30 30 if tgroups:
31 31 f.write('groups = %s\n' % ', '.join(tgroups))
32 32 f.close()
33 33
34 34 # override the functions that give names to uids and gids
35 35 def username(uid=None):
36 36 if uid is None:
37 37 return cuser
38 38 return user
39 39 util.username = username
40 40
41 41 def groupname(gid=None):
42 42 if gid is None:
43 43 return 'bar'
44 44 return group
45 45 util.groupname = groupname
46 46
47 def isowner(fp, st=None):
47 def isowner(st):
48 48 return user == cuser
49 49 util.isowner = isowner
50 50
51 51 # try to read everything
52 52 #print '# File belongs to user %s, group %s' % (user, group)
53 53 #print '# trusted users = %s; trusted groups = %s' % (tusers, tgroups)
54 54 kind = ('different', 'same')
55 55 who = ('', 'user', 'group', 'user and the group')
56 56 trusted = who[(user in tusers) + 2*(group in tgroups)]
57 57 if trusted:
58 58 trusted = ', but we trust the ' + trusted
59 59 print '# %s user, %s group%s' % (kind[user == cuser], kind[group == cgroup],
60 60 trusted)
61 61
62 62 u = ui.ui()
63 63 u.setconfig('ui', 'debug', str(bool(debug)))
64 64 u.readconfig('.hg/hgrc')
65 65 if silent:
66 66 return u
67 67 print 'trusted'
68 68 for name, path in u.configitems('paths'):
69 69 print ' ', name, '=', path
70 70 print 'untrusted'
71 71 for name, path in u.configitems('paths', untrusted=True):
72 72 print '.',
73 73 u.config('paths', name) # warning with debug=True
74 74 print '.',
75 75 u.config('paths', name, untrusted=True) # no warnings
76 76 print name, '=', path
77 77 print
78 78
79 79 return u
80 80
81 81 os.mkdir('repo')
82 82 os.chdir('repo')
83 83 os.mkdir('.hg')
84 84 f = open('.hg/hgrc', 'w')
85 85 f.write('[paths]\n')
86 86 f.write('local = /another/path\n\n')
87 87 f.close()
88 88
89 89 #print '# Everything is run by user foo, group bar\n'
90 90
91 91 # same user, same group
92 92 testui()
93 93 # same user, different group
94 94 testui(group='def')
95 95 # different user, same group
96 96 testui(user='abc')
97 97 # ... but we trust the group
98 98 testui(user='abc', tgroups=['bar'])
99 99 # different user, different group
100 100 testui(user='abc', group='def')
101 101 # ... but we trust the user
102 102 testui(user='abc', group='def', tusers=['abc'])
103 103 # ... but we trust the group
104 104 testui(user='abc', group='def', tgroups=['def'])
105 105 # ... but we trust the user and the group
106 106 testui(user='abc', group='def', tusers=['abc'], tgroups=['def'])
107 107 # ... but we trust all users
108 108 print '# we trust all users'
109 109 testui(user='abc', group='def', tusers=['*'])
110 110 # ... but we trust all groups
111 111 print '# we trust all groups'
112 112 testui(user='abc', group='def', tgroups=['*'])
113 113 # ... but we trust the whole universe
114 114 print '# we trust all users and groups'
115 115 testui(user='abc', group='def', tusers=['*'], tgroups=['*'])
116 116 # ... check that users and groups are in different namespaces
117 117 print "# we don't get confused by users and groups with the same name"
118 118 testui(user='abc', group='def', tusers=['def'], tgroups=['abc'])
119 119 # ... lists of user names work
120 120 print "# list of user names"
121 121 testui(user='abc', group='def', tusers=['foo', 'xyz', 'abc', 'bleh'],
122 122 tgroups=['bar', 'baz', 'qux'])
123 123 # ... lists of group names work
124 124 print "# list of group names"
125 125 testui(user='abc', group='def', tusers=['foo', 'xyz', 'bleh'],
126 126 tgroups=['bar', 'def', 'baz', 'qux'])
127 127
128 128 print "# Can't figure out the name of the user running this process"
129 129 testui(user='abc', group='def', cuser=None)
130 130
131 131 print "# prints debug warnings"
132 132 u = testui(user='abc', group='def', cuser='foo', debug=True)
133 133
134 134 print "# ui.readconfig sections"
135 135 filename = 'foobar'
136 136 f = open(filename, 'w')
137 137 f.write('[foobar]\n')
138 138 f.write('baz = quux\n')
139 139 f.close()
140 140 u.readconfig(filename, sections = ['foobar'])
141 141 print u.config('foobar', 'baz')
142 142
143 143 print
144 144 print "# read trusted, untrusted, new ui, trusted"
145 145 u = ui.ui()
146 146 u.setconfig('ui', 'debug', 'on')
147 147 u.readconfig(filename)
148 148 u2 = u.copy()
149 149 def username(uid=None):
150 150 return 'foo'
151 151 util.username = username
152 152 u2.readconfig('.hg/hgrc')
153 153 print 'trusted:'
154 154 print u2.config('foobar', 'baz')
155 155 print 'untrusted:'
156 156 print u2.config('foobar', 'baz', untrusted=True)
157 157
158 158 print
159 159 print "# error handling"
160 160
161 161 def assertraises(f, exc=util.Abort):
162 162 try:
163 163 f()
164 164 except exc, inst:
165 165 print 'raised', inst.__class__.__name__
166 166 else:
167 167 print 'no exception?!'
168 168
169 169 print "# file doesn't exist"
170 170 os.unlink('.hg/hgrc')
171 171 assert not os.path.exists('.hg/hgrc')
172 172 testui(debug=True, silent=True)
173 173 testui(user='abc', group='def', debug=True, silent=True)
174 174
175 175 print
176 176 print "# parse error"
177 177 f = open('.hg/hgrc', 'w')
178 178 f.write('foo')
179 179 f.close()
180 180
181 181 try:
182 182 testui(user='abc', group='def', silent=True)
183 183 except error.ConfigError, inst:
184 184 print inst
185 185
186 186 try:
187 187 testui(debug=True, silent=True)
188 188 except error.ConfigError, inst:
189 189 print inst
General Comments 0
You need to be logged in to leave comments. Login now