##// END OF EJS Templates
hgweb: extract path traversal checking into standalone function...
Gregory Szorc -
r31790:62f9679d default
parent child Browse files
Show More
@@ -1,224 +1,232 b''
1 1 # hgweb/common.py - Utility functions needed by hgweb_mod and hgwebdir_mod
2 2 #
3 3 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
4 4 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import base64
12 12 import errno
13 13 import mimetypes
14 14 import os
15 15 import uuid
16 16
17 17 from .. import (
18 18 encoding,
19 19 pycompat,
20 20 util,
21 21 )
22 22
23 23 httpserver = util.httpserver
24 24
25 25 HTTP_OK = 200
26 26 HTTP_NOT_MODIFIED = 304
27 27 HTTP_BAD_REQUEST = 400
28 28 HTTP_UNAUTHORIZED = 401
29 29 HTTP_FORBIDDEN = 403
30 30 HTTP_NOT_FOUND = 404
31 31 HTTP_METHOD_NOT_ALLOWED = 405
32 32 HTTP_SERVER_ERROR = 500
33 33
34 34
35 35 def ismember(ui, username, userlist):
36 36 """Check if username is a member of userlist.
37 37
38 38 If userlist has a single '*' member, all users are considered members.
39 39 Can be overridden by extensions to provide more complex authorization
40 40 schemes.
41 41 """
42 42 return userlist == ['*'] or username in userlist
43 43
44 44 def checkauthz(hgweb, req, op):
45 45 '''Check permission for operation based on request data (including
46 46 authentication info). Return if op allowed, else raise an ErrorResponse
47 47 exception.'''
48 48
49 49 user = req.env.get('REMOTE_USER')
50 50
51 51 deny_read = hgweb.configlist('web', 'deny_read')
52 52 if deny_read and (not user or ismember(hgweb.repo.ui, user, deny_read)):
53 53 raise ErrorResponse(HTTP_UNAUTHORIZED, 'read not authorized')
54 54
55 55 allow_read = hgweb.configlist('web', 'allow_read')
56 56 if allow_read and (not ismember(hgweb.repo.ui, user, allow_read)):
57 57 raise ErrorResponse(HTTP_UNAUTHORIZED, 'read not authorized')
58 58
59 59 if op == 'pull' and not hgweb.allowpull:
60 60 raise ErrorResponse(HTTP_UNAUTHORIZED, 'pull not authorized')
61 61 elif op == 'pull' or op is None: # op is None for interface requests
62 62 return
63 63
64 64 # enforce that you can only push using POST requests
65 65 if req.env['REQUEST_METHOD'] != 'POST':
66 66 msg = 'push requires POST request'
67 67 raise ErrorResponse(HTTP_METHOD_NOT_ALLOWED, msg)
68 68
69 69 # require ssl by default for pushing, auth info cannot be sniffed
70 70 # and replayed
71 71 scheme = req.env.get('wsgi.url_scheme')
72 72 if hgweb.configbool('web', 'push_ssl', True) and scheme != 'https':
73 73 raise ErrorResponse(HTTP_FORBIDDEN, 'ssl required')
74 74
75 75 deny = hgweb.configlist('web', 'deny_push')
76 76 if deny and (not user or ismember(hgweb.repo.ui, user, deny)):
77 77 raise ErrorResponse(HTTP_UNAUTHORIZED, 'push not authorized')
78 78
79 79 allow = hgweb.configlist('web', 'allow_push')
80 80 if not (allow and ismember(hgweb.repo.ui, user, allow)):
81 81 raise ErrorResponse(HTTP_UNAUTHORIZED, 'push not authorized')
82 82
83 83 # Hooks for hgweb permission checks; extensions can add hooks here.
84 84 # Each hook is invoked like this: hook(hgweb, request, operation),
85 85 # where operation is either read, pull or push. Hooks should either
86 86 # raise an ErrorResponse exception, or just return.
87 87 #
88 88 # It is possible to do both authentication and authorization through
89 89 # this.
90 90 permhooks = [checkauthz]
91 91
92 92
93 93 class ErrorResponse(Exception):
94 94 def __init__(self, code, message=None, headers=None):
95 95 if message is None:
96 96 message = _statusmessage(code)
97 97 Exception.__init__(self, message)
98 98 self.code = code
99 99 if headers is None:
100 100 headers = []
101 101 self.headers = headers
102 102
103 103 class continuereader(object):
104 104 def __init__(self, f, write):
105 105 self.f = f
106 106 self._write = write
107 107 self.continued = False
108 108
109 109 def read(self, amt=-1):
110 110 if not self.continued:
111 111 self.continued = True
112 112 self._write('HTTP/1.1 100 Continue\r\n\r\n')
113 113 return self.f.read(amt)
114 114
115 115 def __getattr__(self, attr):
116 116 if attr in ('close', 'readline', 'readlines', '__iter__'):
117 117 return getattr(self.f, attr)
118 118 raise AttributeError
119 119
120 120 def _statusmessage(code):
121 121 responses = httpserver.basehttprequesthandler.responses
122 122 return responses.get(code, ('Error', 'Unknown error'))[0]
123 123
124 124 def statusmessage(code, message=None):
125 125 return '%d %s' % (code, message or _statusmessage(code))
126 126
127 127 def get_stat(spath, fn):
128 128 """stat fn if it exists, spath otherwise"""
129 129 cl_path = os.path.join(spath, fn)
130 130 if os.path.exists(cl_path):
131 131 return os.stat(cl_path)
132 132 else:
133 133 return os.stat(spath)
134 134
135 135 def get_mtime(spath):
136 136 return get_stat(spath, "00changelog.i").st_mtime
137 137
138 def ispathsafe(path):
139 """Determine if a path is safe to use for filesystem access."""
140 parts = path.split('/')
141 for part in parts:
142 if (part in ('', os.curdir, os.pardir) or
143 pycompat.ossep in part or
144 pycompat.osaltsep is not None and pycompat.osaltsep in part):
145 return False
146
147 return True
148
138 149 def staticfile(directory, fname, req):
139 150 """return a file inside directory with guessed Content-Type header
140 151
141 152 fname always uses '/' as directory separator and isn't allowed to
142 153 contain unusual path components.
143 154 Content-Type is guessed using the mimetypes module.
144 155 Return an empty string if fname is illegal or file not found.
145 156
146 157 """
147 parts = fname.split('/')
148 for part in parts:
149 if (part in ('', os.curdir, os.pardir) or
150 pycompat.ossep in part or
151 pycompat.osaltsep is not None and pycompat.osaltsep in part):
152 return
153 fpath = os.path.join(*parts)
158 if not ispathsafe(fname):
159 return
160
161 fpath = os.path.join(*fname.split('/'))
154 162 if isinstance(directory, str):
155 163 directory = [directory]
156 164 for d in directory:
157 165 path = os.path.join(d, fpath)
158 166 if os.path.exists(path):
159 167 break
160 168 try:
161 169 os.stat(path)
162 170 ct = mimetypes.guess_type(path)[0] or "text/plain"
163 171 with open(path, 'rb') as fh:
164 172 data = fh.read()
165 173
166 174 req.respond(HTTP_OK, ct, body=data)
167 175 except TypeError:
168 176 raise ErrorResponse(HTTP_SERVER_ERROR, 'illegal filename')
169 177 except OSError as err:
170 178 if err.errno == errno.ENOENT:
171 179 raise ErrorResponse(HTTP_NOT_FOUND)
172 180 else:
173 181 raise ErrorResponse(HTTP_SERVER_ERROR, err.strerror)
174 182
175 183 def paritygen(stripecount, offset=0):
176 184 """count parity of horizontal stripes for easier reading"""
177 185 if stripecount and offset:
178 186 # account for offset, e.g. due to building the list in reverse
179 187 count = (stripecount + offset) % stripecount
180 188 parity = (stripecount + offset) / stripecount & 1
181 189 else:
182 190 count = 0
183 191 parity = 0
184 192 while True:
185 193 yield parity
186 194 count += 1
187 195 if stripecount and count >= stripecount:
188 196 parity = 1 - parity
189 197 count = 0
190 198
191 199 def get_contact(config):
192 200 """Return repo contact information or empty string.
193 201
194 202 web.contact is the primary source, but if that is not set, try
195 203 ui.username or $EMAIL as a fallback to display something useful.
196 204 """
197 205 return (config("web", "contact") or
198 206 config("ui", "username") or
199 207 encoding.environ.get("EMAIL") or "")
200 208
201 209 def caching(web, req):
202 210 tag = 'W/"%s"' % web.mtime
203 211 if req.env.get('HTTP_IF_NONE_MATCH') == tag:
204 212 raise ErrorResponse(HTTP_NOT_MODIFIED)
205 213 req.headers.append(('ETag', tag))
206 214
207 215 def cspvalues(ui):
208 216 """Obtain the Content-Security-Policy header and nonce value.
209 217
210 218 Returns a 2-tuple of the CSP header value and the nonce value.
211 219
212 220 First value is ``None`` if CSP isn't enabled. Second value is ``None``
213 221 if CSP isn't enabled or if the CSP header doesn't need a nonce.
214 222 """
215 223 # Don't allow untrusted CSP setting since it be disable protections
216 224 # from a trusted/global source.
217 225 csp = ui.config('web', 'csp', untrusted=False)
218 226 nonce = None
219 227
220 228 if csp and '%nonce%' in csp:
221 229 nonce = base64.urlsafe_b64encode(uuid.uuid4().bytes).rstrip('=')
222 230 csp = csp.replace('%nonce%', nonce)
223 231
224 232 return csp, nonce
General Comments 0
You need to be logged in to leave comments. Login now