##// END OF EJS Templates
cleanup: use isinstance instead of comparing types...
Mads Kiilerich -
r7884:280c8767 default
parent child Browse files
Show More
@@ -1,454 +1,454 b''
1 1 # The code in this module is entirely lifted from the Lamson project
2 2 # (http://lamsonproject.org/). Its copyright is:
3 3
4 4 # Copyright (c) 2008, Zed A. Shaw
5 5 # All rights reserved.
6 6
7 7 # It is provided under this license:
8 8
9 9 # Redistribution and use in source and binary forms, with or without
10 10 # modification, are permitted provided that the following conditions are met:
11 11
12 12 # * Redistributions of source code must retain the above copyright notice, this
13 13 # list of conditions and the following disclaimer.
14 14
15 15 # * Redistributions in binary form must reproduce the above copyright notice,
16 16 # this list of conditions and the following disclaimer in the documentation
17 17 # and/or other materials provided with the distribution.
18 18
19 19 # * Neither the name of the Zed A. Shaw nor the names of its contributors may
20 20 # be used to endorse or promote products derived from this software without
21 21 # specific prior written permission.
22 22
23 23 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 24 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 25 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
26 26 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
27 27 # COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
28 28 # INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
29 29 # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
30 30 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
31 31 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
32 32 # STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 33 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 34 # POSSIBILITY OF SUCH DAMAGE.
35 35
36 36 import mimetypes
37 37 import os
38 38 import string
39 39 from email import encoders
40 40 from email.charset import Charset
41 41 from email.mime.base import MIMEBase
42 42 from email.utils import parseaddr
43 43
44 44
45 45 ADDRESS_HEADERS_WHITELIST = ['From', 'To', 'Delivered-To', 'Cc']
46 46 DEFAULT_ENCODING = "utf-8"
47 47 VALUE_IS_EMAIL_ADDRESS = lambda v: '@' in v
48 48
49 49
50 50 def normalize_header(header):
51 51 return string.capwords(header.lower(), '-')
52 52
53 53
54 54 class EncodingError(Exception):
55 55 """Thrown when there is an encoding error."""
56 56 pass
57 57
58 58
59 59 class MailBase(object):
60 60 """MailBase is used as the basis of lamson.mail and contains the basics of
61 61 encoding an email. You actually can do all your email processing with this
62 62 class, but it's more raw.
63 63 """
64 64 def __init__(self, items=()):
65 65 self.headers = dict(items)
66 66 self.parts = []
67 67 self.body = None
68 68 self.content_encoding = {'Content-Type': (None, {}),
69 69 'Content-Disposition': (None, {}),
70 70 'Content-Transfer-Encoding': (None, {})}
71 71
72 72 def __getitem__(self, key):
73 73 return self.headers.get(normalize_header(key), None)
74 74
75 75 def __len__(self):
76 76 return len(self.headers)
77 77
78 78 def __iter__(self):
79 79 return iter(self.headers)
80 80
81 81 def __contains__(self, key):
82 82 return normalize_header(key) in self.headers
83 83
84 84 def __setitem__(self, key, value):
85 85 self.headers[normalize_header(key)] = value
86 86
87 87 def __delitem__(self, key):
88 88 del self.headers[normalize_header(key)]
89 89
90 90 def __nonzero__(self):
91 91 return self.body is not None or len(self.headers) > 0 or len(self.parts) > 0
92 92
93 93 def keys(self):
94 94 """Returns the sorted keys."""
95 95 return sorted(self.headers.keys())
96 96
97 97 def attach_file(self, filename, data, ctype, disposition):
98 98 """
99 99 A file attachment is a raw attachment with a disposition that
100 100 indicates the file name.
101 101 """
102 102 assert filename, "You can't attach a file without a filename."
103 103 ctype = ctype.lower()
104 104
105 105 part = MailBase()
106 106 part.body = data
107 107 part.content_encoding['Content-Type'] = (ctype, {'name': filename})
108 108 part.content_encoding['Content-Disposition'] = (disposition,
109 109 {'filename': filename})
110 110 self.parts.append(part)
111 111
112 112 def attach_text(self, data, ctype):
113 113 """
114 114 This attaches a simpler text encoded part, which doesn't have a
115 115 filename.
116 116 """
117 117 ctype = ctype.lower()
118 118
119 119 part = MailBase()
120 120 part.body = data
121 121 part.content_encoding['Content-Type'] = (ctype, {})
122 122 self.parts.append(part)
123 123
124 124 def walk(self):
125 125 for p in self.parts:
126 126 yield p
127 127 for x in p.walk():
128 128 yield x
129 129
130 130
131 131 class MailResponse(object):
132 132 """
133 133 You are given MailResponse objects from the lamson.view methods, and
134 134 whenever you want to generate an email to send to someone. It has the
135 135 same basic functionality as MailRequest, but it is designed to be written
136 136 to, rather than read from (although you can do both).
137 137
138 138 You can easily set a Body or Html during creation or after by passing it
139 139 as __init__ parameters, or by setting those attributes.
140 140
141 141 You can initially set the From, To, and Subject, but they are headers so
142 142 use the dict notation to change them: msg['From'] = 'joe@example.com'.
143 143
144 144 The message is not fully crafted until right when you convert it with
145 145 MailResponse.to_message. This lets you change it and work with it, then
146 146 send it out when it's ready.
147 147 """
148 148 def __init__(self, To=None, From=None, Subject=None, Body=None, Html=None,
149 149 separator="; "):
150 150 self.Body = Body
151 151 self.Html = Html
152 152 self.base = MailBase([('To', To), ('From', From), ('Subject', Subject)])
153 153 self.multipart = self.Body and self.Html
154 154 self.attachments = []
155 155 self.separator = separator
156 156
157 157 def __contains__(self, key):
158 158 return self.base.__contains__(key)
159 159
160 160 def __getitem__(self, key):
161 161 return self.base.__getitem__(key)
162 162
163 163 def __setitem__(self, key, val):
164 164 return self.base.__setitem__(key, val)
165 165
166 166 def __delitem__(self, name):
167 167 del self.base[name]
168 168
169 169 def attach(self, filename=None, content_type=None, data=None,
170 170 disposition=None):
171 171 """
172 172
173 173 Simplifies attaching files from disk or data as files. To attach
174 174 simple text simple give data and a content_type. To attach a file,
175 175 give the data/content_type/filename/disposition combination.
176 176
177 177 For convenience, if you don't give data and only a filename, then it
178 178 will read that file's contents when you call to_message() later. If
179 179 you give data and filename then it will assume you've filled data
180 180 with what the file's contents are and filename is just the name to
181 181 use.
182 182 """
183 183
184 184 assert filename or data, ("You must give a filename or some data to "
185 185 "attach.")
186 186 assert data or os.path.exists(filename), ("File doesn't exist, and no "
187 187 "data given.")
188 188
189 189 self.multipart = True
190 190
191 191 if filename and not content_type:
192 192 content_type, encoding = mimetypes.guess_type(filename)
193 193
194 194 assert content_type, ("No content type given, and couldn't guess "
195 195 "from the filename: %r" % filename)
196 196
197 197 self.attachments.append({'filename': filename,
198 198 'content_type': content_type,
199 199 'data': data,
200 200 'disposition': disposition})
201 201
202 202 def attach_part(self, part):
203 203 """
204 204 Attaches a raw MailBase part from a MailRequest (or anywhere)
205 205 so that you can copy it over.
206 206 """
207 207 self.multipart = True
208 208
209 209 self.attachments.append({'filename': None,
210 210 'content_type': None,
211 211 'data': None,
212 212 'disposition': None,
213 213 'part': part,
214 214 })
215 215
216 216 def attach_all_parts(self, mail_request):
217 217 """
218 218 Used for copying the attachment parts of a mail.MailRequest
219 219 object for mailing lists that need to maintain attachments.
220 220 """
221 221 for part in mail_request.all_parts():
222 222 self.attach_part(part)
223 223
224 224 self.base.content_encoding = mail_request.base.content_encoding.copy()
225 225
226 226 def clear(self):
227 227 """
228 228 Clears out the attachments so you can redo them. Use this to keep the
229 229 headers for a series of different messages with different attachments.
230 230 """
231 231 del self.attachments[:]
232 232 del self.base.parts[:]
233 233 self.multipart = False
234 234
235 235 def update(self, message):
236 236 """
237 237 Used to easily set a bunch of heading from another dict
238 238 like object.
239 239 """
240 240 for k in message.keys():
241 241 self.base[k] = message[k]
242 242
243 243 def __str__(self):
244 244 """
245 245 Converts to a string.
246 246 """
247 247 return self.to_message().as_string()
248 248
249 249 def _encode_attachment(self, filename=None, content_type=None, data=None,
250 250 disposition=None, part=None):
251 251 """
252 252 Used internally to take the attachments mentioned in self.attachments
253 253 and do the actual encoding in a lazy way when you call to_message.
254 254 """
255 255 if part:
256 256 self.base.parts.append(part)
257 257 elif filename:
258 258 if not data:
259 259 data = open(filename).read()
260 260
261 261 self.base.attach_file(filename, data, content_type,
262 262 disposition or 'attachment')
263 263 else:
264 264 self.base.attach_text(data, content_type)
265 265
266 266 ctype = self.base.content_encoding['Content-Type'][0]
267 267
268 268 if ctype and not ctype.startswith('multipart'):
269 269 self.base.content_encoding['Content-Type'] = ('multipart/mixed', {})
270 270
271 271 def to_message(self):
272 272 """
273 273 Figures out all the required steps to finally craft the
274 274 message you need and return it. The resulting message
275 275 is also available as a self.base attribute.
276 276
277 277 What is returned is a Python email API message you can
278 278 use with those APIs. The self.base attribute is the raw
279 279 lamson.encoding.MailBase.
280 280 """
281 281 del self.base.parts[:]
282 282
283 283 if self.Body and self.Html:
284 284 self.multipart = True
285 285 self.base.content_encoding['Content-Type'] = (
286 286 'multipart/alternative', {})
287 287
288 288 if self.multipart:
289 289 self.base.body = None
290 290 if self.Body:
291 291 self.base.attach_text(self.Body, 'text/plain')
292 292
293 293 if self.Html:
294 294 self.base.attach_text(self.Html, 'text/html')
295 295
296 296 for args in self.attachments:
297 297 self._encode_attachment(**args)
298 298
299 299 elif self.Body:
300 300 self.base.body = self.Body
301 301 self.base.content_encoding['Content-Type'] = ('text/plain', {})
302 302
303 303 elif self.Html:
304 304 self.base.body = self.Html
305 305 self.base.content_encoding['Content-Type'] = ('text/html', {})
306 306
307 307 return to_message(self.base, separator=self.separator)
308 308
309 309 def all_parts(self):
310 310 """
311 311 Returns all the encoded parts. Only useful for debugging
312 312 or inspecting after calling to_message().
313 313 """
314 314 return self.base.parts
315 315
316 316 def keys(self):
317 317 return self.base.keys()
318 318
319 319
320 320 def to_message(mail, separator="; "):
321 321 """
322 322 Given a MailBase message, this will construct a MIMEPart
323 323 that is canonicalized for use with the Python email API.
324 324 """
325 325 ctype, params = mail.content_encoding['Content-Type']
326 326
327 327 if not ctype:
328 328 if mail.parts:
329 329 ctype = 'multipart/mixed'
330 330 else:
331 331 ctype = 'text/plain'
332 332 else:
333 333 if mail.parts:
334 334 assert ctype.startswith(("multipart", "message")), \
335 335 "Content type should be multipart or message, not %r" % ctype
336 336
337 337 # adjust the content type according to what it should be now
338 338 mail.content_encoding['Content-Type'] = (ctype, params)
339 339
340 340 try:
341 341 out = MIMEPart(ctype, **params)
342 342 except TypeError as exc: # pragma: no cover
343 343 raise EncodingError("Content-Type malformed, not allowed: %r; "
344 344 "%r (Python ERROR: %s" %
345 345 (ctype, params, exc.message))
346 346
347 347 for k in mail.keys():
348 348 if k in ADDRESS_HEADERS_WHITELIST:
349 349 out[k.encode('ascii')] = header_to_mime_encoding(
350 350 mail[k],
351 351 not_email=False,
352 352 separator=separator
353 353 )
354 354 else:
355 355 out[k.encode('ascii')] = header_to_mime_encoding(
356 356 mail[k],
357 357 not_email=True
358 358 )
359 359
360 360 out.extract_payload(mail)
361 361
362 362 # go through the children
363 363 for part in mail.parts:
364 364 out.attach(to_message(part))
365 365
366 366 return out
367 367
368 368
369 369 class MIMEPart(MIMEBase):
370 370 """
371 371 A reimplementation of nearly everything in email.mime to be more useful
372 372 for actually attaching things. Rather than one class for every type of
373 373 thing you'd encode, there's just this one, and it figures out how to
374 374 encode what you ask it.
375 375 """
376 376 def __init__(self, type, **params):
377 377 self.maintype, self.subtype = type.split('/')
378 378 MIMEBase.__init__(self, self.maintype, self.subtype, **params)
379 379
380 380 def add_text(self, content):
381 381 # this is text, so encode it in canonical form
382 382 try:
383 383 encoded = content.encode('ascii')
384 384 charset = 'ascii'
385 385 except UnicodeError:
386 386 encoded = content.encode('utf-8')
387 387 charset = 'utf-8'
388 388
389 389 self.set_payload(encoded, charset=charset)
390 390
391 391 def extract_payload(self, mail):
392 392 if mail.body is None:
393 393 return # only None, '' is still ok
394 394
395 395 ctype, ctype_params = mail.content_encoding['Content-Type']
396 396 cdisp, cdisp_params = mail.content_encoding['Content-Disposition']
397 397
398 398 assert ctype, ("Extract payload requires that mail.content_encoding "
399 399 "have a valid Content-Type.")
400 400
401 401 if ctype.startswith("text/"):
402 402 self.add_text(mail.body)
403 403 else:
404 404 if cdisp:
405 405 # replicate the content-disposition settings
406 406 self.add_header('Content-Disposition', cdisp, **cdisp_params)
407 407
408 408 self.set_payload(mail.body)
409 409 encoders.encode_base64(self)
410 410
411 411 def __repr__(self):
412 412 return "<MIMEPart '%s/%s': %r, %r, multipart=%r>" % (
413 413 self.subtype,
414 414 self.maintype,
415 415 self['Content-Type'],
416 416 self['Content-Disposition'],
417 417 self.is_multipart())
418 418
419 419
420 420 def header_to_mime_encoding(value, not_email=False, separator=", "):
421 421 if not value:
422 422 return ""
423 423
424 424 encoder = Charset(DEFAULT_ENCODING)
425 if type(value) == list:
425 if isinstance(value, list):
426 426 return separator.join(properly_encode_header(
427 427 v, encoder, not_email) for v in value)
428 428 else:
429 429 return properly_encode_header(value, encoder, not_email)
430 430
431 431
432 432 def properly_encode_header(value, encoder, not_email):
433 433 """
434 434 The only thing special (weird) about this function is that it tries
435 435 to do a fast check to see if the header value has an email address in
436 436 it. Since random headers could have an email address, and email addresses
437 437 have weird special formatting rules, we have to check for it.
438 438
439 439 Normally this works fine, but in Librelist, we need to "obfuscate" email
440 440 addresses by changing the '@' to '-AT-'. This is where
441 441 VALUE_IS_EMAIL_ADDRESS exists. It's a simple lambda returning True/False
442 442 to check if a header value has an email address. If you need to make this
443 443 check different, then change this.
444 444 """
445 445 try:
446 446 return value.encode("ascii")
447 447 except UnicodeEncodeError:
448 448 if not not_email and VALUE_IS_EMAIL_ADDRESS(value):
449 449 # this could have an email address, make sure we don't screw it up
450 450 name, address = parseaddr(value)
451 451 return '"%s" <%s>' % (
452 452 encoder.header_encode(name.encode("utf-8")), address)
453 453
454 454 return encoder.header_encode(value.encode("utf-8"))
@@ -1,2548 +1,2548 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 """
15 15 kallithea.model.db
16 16 ~~~~~~~~~~~~~~~~~~
17 17
18 18 Database Models for Kallithea
19 19
20 20 This file was forked by the Kallithea project in July 2014.
21 21 Original author and date, and relevant copyright and licensing information is below:
22 22 :created_on: Apr 08, 2010
23 23 :author: marcink
24 24 :copyright: (c) 2013 RhodeCode GmbH, and others.
25 25 :license: GPLv3, see LICENSE.md for more details.
26 26 """
27 27
28 28 import collections
29 29 import datetime
30 30 import functools
31 31 import hashlib
32 32 import logging
33 33 import os
34 34 import time
35 35 import traceback
36 36
37 37 import ipaddr
38 38 import sqlalchemy
39 39 from beaker.cache import cache_region, region_invalidate
40 40 from sqlalchemy import *
41 41 from sqlalchemy.ext.hybrid import hybrid_property
42 42 from sqlalchemy.orm import class_mapper, joinedload, relationship, validates
43 43 from tg.i18n import lazy_ugettext as _
44 44 from webob.exc import HTTPNotFound
45 45
46 46 import kallithea
47 47 from kallithea.lib.caching_query import FromCache
48 48 from kallithea.lib.compat import json
49 49 from kallithea.lib.exceptions import DefaultUserException
50 50 from kallithea.lib.utils2 import Optional, aslist, get_changeset_safe, get_clone_url, remove_prefix, safe_int, safe_str, safe_unicode, str2bool, urlreadable
51 51 from kallithea.lib.vcs import get_backend
52 52 from kallithea.lib.vcs.backends.base import EmptyChangeset
53 53 from kallithea.lib.vcs.utils.helpers import get_scm
54 54 from kallithea.lib.vcs.utils.lazy import LazyProperty
55 55 from kallithea.model.meta import Base, Session
56 56
57 57
58 58 URL_SEP = '/'
59 59 log = logging.getLogger(__name__)
60 60
61 61 #==============================================================================
62 62 # BASE CLASSES
63 63 #==============================================================================
64 64
65 65 _hash_key = lambda k: hashlib.md5(safe_str(k)).hexdigest()
66 66
67 67
68 68 class BaseDbModel(object):
69 69 """
70 70 Base Model for all classes
71 71 """
72 72
73 73 @classmethod
74 74 def _get_keys(cls):
75 75 """return column names for this model """
76 76 return class_mapper(cls).c.keys()
77 77
78 78 def get_dict(self):
79 79 """
80 80 return dict with keys and values corresponding
81 81 to this model data """
82 82
83 83 d = {}
84 84 for k in self._get_keys():
85 85 d[k] = getattr(self, k)
86 86
87 87 # also use __json__() if present to get additional fields
88 88 _json_attr = getattr(self, '__json__', None)
89 89 if _json_attr:
90 90 # update with attributes from __json__
91 91 if callable(_json_attr):
92 92 _json_attr = _json_attr()
93 93 for k, val in _json_attr.iteritems():
94 94 d[k] = val
95 95 return d
96 96
97 97 def get_appstruct(self):
98 98 """return list with keys and values tuples corresponding
99 99 to this model data """
100 100
101 101 return [
102 102 (k, getattr(self, k))
103 103 for k in self._get_keys()
104 104 ]
105 105
106 106 def populate_obj(self, populate_dict):
107 107 """populate model with data from given populate_dict"""
108 108
109 109 for k in self._get_keys():
110 110 if k in populate_dict:
111 111 setattr(self, k, populate_dict[k])
112 112
113 113 @classmethod
114 114 def query(cls):
115 115 return Session().query(cls)
116 116
117 117 @classmethod
118 118 def get(cls, id_):
119 119 if id_:
120 120 return cls.query().get(id_)
121 121
122 122 @classmethod
123 123 def guess_instance(cls, value, callback=None):
124 124 """Haphazardly attempt to convert `value` to a `cls` instance.
125 125
126 126 If `value` is None or already a `cls` instance, return it. If `value`
127 127 is a number (or looks like one if you squint just right), assume it's
128 128 a database primary key and let SQLAlchemy sort things out. Otherwise,
129 129 fall back to resolving it using `callback` (if specified); this could
130 130 e.g. be a function that looks up instances by name (though that won't
131 131 work if the name begins with a digit). Otherwise, raise Exception.
132 132 """
133 133
134 134 if value is None:
135 135 return None
136 136 if isinstance(value, cls):
137 137 return value
138 138 if isinstance(value, (int, long)) or safe_str(value).isdigit():
139 139 return cls.get(value)
140 140 if callback is not None:
141 141 return callback(value)
142 142
143 143 raise Exception(
144 144 'given object must be int, long or Instance of %s '
145 145 'got %s, no callback provided' % (cls, type(value))
146 146 )
147 147
148 148 @classmethod
149 149 def get_or_404(cls, id_):
150 150 try:
151 151 id_ = int(id_)
152 152 except (TypeError, ValueError):
153 153 raise HTTPNotFound
154 154
155 155 res = cls.query().get(id_)
156 156 if res is None:
157 157 raise HTTPNotFound
158 158 return res
159 159
160 160 @classmethod
161 161 def delete(cls, id_):
162 162 obj = cls.query().get(id_)
163 163 Session().delete(obj)
164 164
165 165 def __repr__(self):
166 166 if hasattr(self, '__unicode__'):
167 167 # python repr needs to return str
168 168 try:
169 169 return safe_str(self.__unicode__())
170 170 except UnicodeDecodeError:
171 171 pass
172 172 return '<DB:%s>' % (self.__class__.__name__)
173 173
174 174
175 175 _table_args_default_dict = {'extend_existing': True,
176 176 'mysql_engine': 'InnoDB',
177 177 'mysql_charset': 'utf8',
178 178 'sqlite_autoincrement': True,
179 179 }
180 180
181 181 class Setting(Base, BaseDbModel):
182 182 __tablename__ = 'settings'
183 183 __table_args__ = (
184 184 _table_args_default_dict,
185 185 )
186 186
187 187 SETTINGS_TYPES = {
188 188 'str': safe_str,
189 189 'int': safe_int,
190 190 'unicode': safe_unicode,
191 191 'bool': str2bool,
192 192 'list': functools.partial(aslist, sep=',')
193 193 }
194 194 DEFAULT_UPDATE_URL = ''
195 195
196 196 app_settings_id = Column(Integer(), primary_key=True)
197 197 app_settings_name = Column(String(255), nullable=False, unique=True)
198 198 _app_settings_value = Column("app_settings_value", Unicode(4096), nullable=False)
199 199 _app_settings_type = Column("app_settings_type", String(255), nullable=True) # FIXME: not nullable?
200 200
201 201 def __init__(self, key='', val='', type='unicode'):
202 202 self.app_settings_name = key
203 203 self.app_settings_value = val
204 204 self.app_settings_type = type
205 205
206 206 @validates('_app_settings_value')
207 207 def validate_settings_value(self, key, val):
208 assert type(val) == unicode
208 assert isinstance(val, unicode)
209 209 return val
210 210
211 211 @hybrid_property
212 212 def app_settings_value(self):
213 213 v = self._app_settings_value
214 214 _type = self.app_settings_type
215 215 converter = self.SETTINGS_TYPES.get(_type) or self.SETTINGS_TYPES['unicode']
216 216 return converter(v)
217 217
218 218 @app_settings_value.setter
219 219 def app_settings_value(self, val):
220 220 """
221 221 Setter that will always make sure we use unicode in app_settings_value
222 222
223 223 :param val:
224 224 """
225 225 self._app_settings_value = safe_unicode(val)
226 226
227 227 @hybrid_property
228 228 def app_settings_type(self):
229 229 return self._app_settings_type
230 230
231 231 @app_settings_type.setter
232 232 def app_settings_type(self, val):
233 233 if val not in self.SETTINGS_TYPES:
234 234 raise Exception('type must be one of %s got %s'
235 235 % (self.SETTINGS_TYPES.keys(), val))
236 236 self._app_settings_type = val
237 237
238 238 def __unicode__(self):
239 239 return u"<%s('%s:%s[%s]')>" % (
240 240 self.__class__.__name__,
241 241 self.app_settings_name, self.app_settings_value, self.app_settings_type
242 242 )
243 243
244 244 @classmethod
245 245 def get_by_name(cls, key):
246 246 return cls.query() \
247 247 .filter(cls.app_settings_name == key).scalar()
248 248
249 249 @classmethod
250 250 def get_by_name_or_create(cls, key, val='', type='unicode'):
251 251 res = cls.get_by_name(key)
252 252 if res is None:
253 253 res = cls(key, val, type)
254 254 return res
255 255
256 256 @classmethod
257 257 def create_or_update(cls, key, val=Optional(''), type=Optional('unicode')):
258 258 """
259 259 Creates or updates Kallithea setting. If updates are triggered, it will only
260 260 update parameters that are explicitly set. Optional instance will be skipped.
261 261
262 262 :param key:
263 263 :param val:
264 264 :param type:
265 265 :return:
266 266 """
267 267 res = cls.get_by_name(key)
268 268 if res is None:
269 269 val = Optional.extract(val)
270 270 type = Optional.extract(type)
271 271 res = cls(key, val, type)
272 272 Session().add(res)
273 273 else:
274 274 res.app_settings_name = key
275 275 if not isinstance(val, Optional):
276 276 # update if set
277 277 res.app_settings_value = val
278 278 if not isinstance(type, Optional):
279 279 # update if set
280 280 res.app_settings_type = type
281 281 return res
282 282
283 283 @classmethod
284 284 def get_app_settings(cls, cache=False):
285 285
286 286 ret = cls.query()
287 287
288 288 if cache:
289 289 ret = ret.options(FromCache("sql_cache_short", "get_hg_settings"))
290 290
291 291 if ret is None:
292 292 raise Exception('Could not get application settings !')
293 293 settings = {}
294 294 for each in ret:
295 295 settings[each.app_settings_name] = \
296 296 each.app_settings_value
297 297
298 298 return settings
299 299
300 300 @classmethod
301 301 def get_auth_settings(cls, cache=False):
302 302 ret = cls.query() \
303 303 .filter(cls.app_settings_name.startswith('auth_')).all()
304 304 fd = {}
305 305 for row in ret:
306 306 fd[row.app_settings_name] = row.app_settings_value
307 307 return fd
308 308
309 309 @classmethod
310 310 def get_default_repo_settings(cls, cache=False, strip_prefix=False):
311 311 ret = cls.query() \
312 312 .filter(cls.app_settings_name.startswith('default_')).all()
313 313 fd = {}
314 314 for row in ret:
315 315 key = row.app_settings_name
316 316 if strip_prefix:
317 317 key = remove_prefix(key, prefix='default_')
318 318 fd.update({key: row.app_settings_value})
319 319
320 320 return fd
321 321
322 322 @classmethod
323 323 def get_server_info(cls):
324 324 import pkg_resources
325 325 import platform
326 326 from kallithea.lib.utils import check_git_version
327 327 mods = [(p.project_name, p.version) for p in pkg_resources.working_set]
328 328 info = {
329 329 'modules': sorted(mods, key=lambda k: k[0].lower()),
330 330 'py_version': platform.python_version(),
331 331 'platform': safe_unicode(platform.platform()),
332 332 'kallithea_version': kallithea.__version__,
333 333 'git_version': safe_unicode(check_git_version()),
334 334 'git_path': kallithea.CONFIG.get('git_path')
335 335 }
336 336 return info
337 337
338 338
339 339 class Ui(Base, BaseDbModel):
340 340 __tablename__ = 'ui'
341 341 __table_args__ = (
342 342 # FIXME: ui_key as key is wrong and should be removed when the corresponding
343 343 # Ui.get_by_key has been replaced by the composite key
344 344 UniqueConstraint('ui_key'),
345 345 UniqueConstraint('ui_section', 'ui_key'),
346 346 _table_args_default_dict,
347 347 )
348 348
349 349 HOOK_UPDATE = 'changegroup.update'
350 350 HOOK_REPO_SIZE = 'changegroup.repo_size'
351 351
352 352 ui_id = Column(Integer(), primary_key=True)
353 353 ui_section = Column(String(255), nullable=False)
354 354 ui_key = Column(String(255), nullable=False)
355 355 ui_value = Column(String(255), nullable=True) # FIXME: not nullable?
356 356 ui_active = Column(Boolean(), nullable=False, default=True)
357 357
358 358 @classmethod
359 359 def get_by_key(cls, section, key):
360 360 """ Return specified Ui object, or None if not found. """
361 361 return cls.query().filter_by(ui_section=section, ui_key=key).scalar()
362 362
363 363 @classmethod
364 364 def get_or_create(cls, section, key):
365 365 """ Return specified Ui object, creating it if necessary. """
366 366 setting = cls.get_by_key(section, key)
367 367 if setting is None:
368 368 setting = cls(ui_section=section, ui_key=key)
369 369 Session().add(setting)
370 370 return setting
371 371
372 372 @classmethod
373 373 def get_builtin_hooks(cls):
374 374 q = cls.query()
375 375 q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
376 376 q = q.filter(cls.ui_section == 'hooks')
377 377 return q.all()
378 378
379 379 @classmethod
380 380 def get_custom_hooks(cls):
381 381 q = cls.query()
382 382 q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
383 383 q = q.filter(cls.ui_section == 'hooks')
384 384 return q.all()
385 385
386 386 @classmethod
387 387 def get_repos_location(cls):
388 388 return cls.get_by_key('paths', '/').ui_value
389 389
390 390 @classmethod
391 391 def create_or_update_hook(cls, key, val):
392 392 new_ui = cls.get_or_create('hooks', key)
393 393 new_ui.ui_active = True
394 394 new_ui.ui_value = val
395 395
396 396 def __repr__(self):
397 397 return '<%s[%s]%s=>%s]>' % (self.__class__.__name__, self.ui_section,
398 398 self.ui_key, self.ui_value)
399 399
400 400
401 401 class User(Base, BaseDbModel):
402 402 __tablename__ = 'users'
403 403 __table_args__ = (
404 404 Index('u_username_idx', 'username'),
405 405 Index('u_email_idx', 'email'),
406 406 _table_args_default_dict,
407 407 )
408 408
409 409 DEFAULT_USER = 'default'
410 410 DEFAULT_GRAVATAR_URL = 'https://secure.gravatar.com/avatar/{md5email}?d=identicon&s={size}'
411 411 # The name of the default auth type in extern_type, 'internal' lives in auth_internal.py
412 412 DEFAULT_AUTH_TYPE = 'internal'
413 413
414 414 user_id = Column(Integer(), primary_key=True)
415 415 username = Column(String(255), nullable=False, unique=True)
416 416 password = Column(String(255), nullable=False)
417 417 active = Column(Boolean(), nullable=False, default=True)
418 418 admin = Column(Boolean(), nullable=False, default=False)
419 419 name = Column("firstname", Unicode(255), nullable=False)
420 420 lastname = Column(Unicode(255), nullable=False)
421 421 _email = Column("email", String(255), nullable=True, unique=True) # FIXME: not nullable?
422 422 last_login = Column(DateTime(timezone=False), nullable=True)
423 423 extern_type = Column(String(255), nullable=True) # FIXME: not nullable?
424 424 extern_name = Column(String(255), nullable=True) # FIXME: not nullable?
425 425 api_key = Column(String(255), nullable=False)
426 426 created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
427 427 _user_data = Column("user_data", LargeBinary(), nullable=True) # JSON data # FIXME: not nullable?
428 428
429 429 user_log = relationship('UserLog')
430 430 user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
431 431
432 432 repositories = relationship('Repository')
433 433 repo_groups = relationship('RepoGroup')
434 434 user_groups = relationship('UserGroup')
435 435 user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
436 436 followings = relationship('UserFollowing', primaryjoin='UserFollowing.user_id==User.user_id', cascade='all')
437 437
438 438 repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
439 439 repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
440 440
441 441 group_member = relationship('UserGroupMember', cascade='all')
442 442
443 443 # comments created by this user
444 444 user_comments = relationship('ChangesetComment', cascade='all')
445 445 # extra emails for this user
446 446 user_emails = relationship('UserEmailMap', cascade='all')
447 447 # extra API keys
448 448 user_api_keys = relationship('UserApiKeys', cascade='all')
449 449 ssh_keys = relationship('UserSshKeys', cascade='all')
450 450
451 451 @hybrid_property
452 452 def email(self):
453 453 return self._email
454 454
455 455 @email.setter
456 456 def email(self, val):
457 457 self._email = val.lower() if val else None
458 458
459 459 @property
460 460 def firstname(self):
461 461 # alias for future
462 462 return self.name
463 463
464 464 @property
465 465 def emails(self):
466 466 other = UserEmailMap.query().filter(UserEmailMap.user == self).all()
467 467 return [self.email] + [x.email for x in other]
468 468
469 469 @property
470 470 def api_keys(self):
471 471 other = UserApiKeys.query().filter(UserApiKeys.user == self).all()
472 472 return [self.api_key] + [x.api_key for x in other]
473 473
474 474 @property
475 475 def ip_addresses(self):
476 476 ret = UserIpMap.query().filter(UserIpMap.user == self).all()
477 477 return [x.ip_addr for x in ret]
478 478
479 479 @property
480 480 def full_name(self):
481 481 return '%s %s' % (self.firstname, self.lastname)
482 482
483 483 @property
484 484 def full_name_or_username(self):
485 485 """
486 486 Show full name.
487 487 If full name is not set, fall back to username.
488 488 """
489 489 return ('%s %s' % (self.firstname, self.lastname)
490 490 if (self.firstname and self.lastname) else self.username)
491 491
492 492 @property
493 493 def full_name_and_username(self):
494 494 """
495 495 Show full name and username as 'Firstname Lastname (username)'.
496 496 If full name is not set, fall back to username.
497 497 """
498 498 return ('%s %s (%s)' % (self.firstname, self.lastname, self.username)
499 499 if (self.firstname and self.lastname) else self.username)
500 500
501 501 @property
502 502 def full_contact(self):
503 503 return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
504 504
505 505 @property
506 506 def short_contact(self):
507 507 return '%s %s' % (self.firstname, self.lastname)
508 508
509 509 @property
510 510 def is_admin(self):
511 511 return self.admin
512 512
513 513 @hybrid_property
514 514 def is_default_user(self):
515 515 return self.username == User.DEFAULT_USER
516 516
517 517 @hybrid_property
518 518 def user_data(self):
519 519 if not self._user_data:
520 520 return {}
521 521
522 522 try:
523 523 return json.loads(self._user_data)
524 524 except TypeError:
525 525 return {}
526 526
527 527 @user_data.setter
528 528 def user_data(self, val):
529 529 try:
530 530 self._user_data = json.dumps(val)
531 531 except Exception:
532 532 log.error(traceback.format_exc())
533 533
534 534 def __unicode__(self):
535 535 return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
536 536 self.user_id, self.username)
537 537
538 538 @classmethod
539 539 def guess_instance(cls, value):
540 540 return super(User, cls).guess_instance(value, User.get_by_username)
541 541
542 542 @classmethod
543 543 def get_or_404(cls, id_, allow_default=True):
544 544 '''
545 545 Overridden version of BaseDbModel.get_or_404, with an extra check on
546 546 the default user.
547 547 '''
548 548 user = super(User, cls).get_or_404(id_)
549 549 if not allow_default and user.is_default_user:
550 550 raise DefaultUserException()
551 551 return user
552 552
553 553 @classmethod
554 554 def get_by_username_or_email(cls, username_or_email, case_insensitive=False, cache=False):
555 555 """
556 556 For anything that looks like an email address, look up by the email address (matching
557 557 case insensitively).
558 558 For anything else, try to look up by the user name.
559 559
560 560 This assumes no normal username can have '@' symbol.
561 561 """
562 562 if '@' in username_or_email:
563 563 return User.get_by_email(username_or_email, cache=cache)
564 564 else:
565 565 return User.get_by_username(username_or_email, case_insensitive=case_insensitive, cache=cache)
566 566
567 567 @classmethod
568 568 def get_by_username(cls, username, case_insensitive=False, cache=False):
569 569 if case_insensitive:
570 570 q = cls.query().filter(func.lower(cls.username) == func.lower(username))
571 571 else:
572 572 q = cls.query().filter(cls.username == username)
573 573
574 574 if cache:
575 575 q = q.options(FromCache(
576 576 "sql_cache_short",
577 577 "get_user_%s" % _hash_key(username)
578 578 )
579 579 )
580 580 return q.scalar()
581 581
582 582 @classmethod
583 583 def get_by_api_key(cls, api_key, cache=False, fallback=True):
584 584 if len(api_key) != 40 or not api_key.isalnum():
585 585 return None
586 586
587 587 q = cls.query().filter(cls.api_key == api_key)
588 588
589 589 if cache:
590 590 q = q.options(FromCache("sql_cache_short",
591 591 "get_api_key_%s" % api_key))
592 592 res = q.scalar()
593 593
594 594 if fallback and not res:
595 595 # fallback to additional keys
596 596 _res = UserApiKeys.query().filter_by(api_key=api_key, is_expired=False).first()
597 597 if _res:
598 598 res = _res.user
599 599 if res is None or not res.active or res.is_default_user:
600 600 return None
601 601 return res
602 602
603 603 @classmethod
604 604 def get_by_email(cls, email, cache=False):
605 605 q = cls.query().filter(func.lower(cls.email) == func.lower(email))
606 606
607 607 if cache:
608 608 q = q.options(FromCache("sql_cache_short",
609 609 "get_email_key_%s" % email))
610 610
611 611 ret = q.scalar()
612 612 if ret is None:
613 613 q = UserEmailMap.query()
614 614 # try fetching in alternate email map
615 615 q = q.filter(func.lower(UserEmailMap.email) == func.lower(email))
616 616 q = q.options(joinedload(UserEmailMap.user))
617 617 if cache:
618 618 q = q.options(FromCache("sql_cache_short",
619 619 "get_email_map_key_%s" % email))
620 620 ret = getattr(q.scalar(), 'user', None)
621 621
622 622 return ret
623 623
624 624 @classmethod
625 625 def get_from_cs_author(cls, author):
626 626 """
627 627 Tries to get User objects out of commit author string
628 628
629 629 :param author:
630 630 """
631 631 from kallithea.lib.helpers import email, author_name
632 632 # Valid email in the attribute passed, see if they're in the system
633 633 _email = email(author)
634 634 if _email:
635 635 user = cls.get_by_email(_email)
636 636 if user is not None:
637 637 return user
638 638 # Maybe we can match by username?
639 639 _author = author_name(author)
640 640 user = cls.get_by_username(_author, case_insensitive=True)
641 641 if user is not None:
642 642 return user
643 643
644 644 def update_lastlogin(self):
645 645 """Update user lastlogin"""
646 646 self.last_login = datetime.datetime.now()
647 647 log.debug('updated user %s lastlogin', self.username)
648 648
649 649 @classmethod
650 650 def get_first_admin(cls):
651 651 user = User.query().filter(User.admin == True).first()
652 652 if user is None:
653 653 raise Exception('Missing administrative account!')
654 654 return user
655 655
656 656 @classmethod
657 657 def get_default_user(cls, cache=False):
658 658 user = User.get_by_username(User.DEFAULT_USER, cache=cache)
659 659 if user is None:
660 660 raise Exception('Missing default account!')
661 661 return user
662 662
663 663 def get_api_data(self, details=False):
664 664 """
665 665 Common function for generating user related data for API
666 666 """
667 667 user = self
668 668 data = dict(
669 669 user_id=user.user_id,
670 670 username=user.username,
671 671 firstname=user.name,
672 672 lastname=user.lastname,
673 673 email=user.email,
674 674 emails=user.emails,
675 675 active=user.active,
676 676 admin=user.admin,
677 677 )
678 678 if details:
679 679 data.update(dict(
680 680 extern_type=user.extern_type,
681 681 extern_name=user.extern_name,
682 682 api_key=user.api_key,
683 683 api_keys=user.api_keys,
684 684 last_login=user.last_login,
685 685 ip_addresses=user.ip_addresses
686 686 ))
687 687 return data
688 688
689 689 def __json__(self):
690 690 data = dict(
691 691 full_name=self.full_name,
692 692 full_name_or_username=self.full_name_or_username,
693 693 short_contact=self.short_contact,
694 694 full_contact=self.full_contact
695 695 )
696 696 data.update(self.get_api_data())
697 697 return data
698 698
699 699
700 700 class UserApiKeys(Base, BaseDbModel):
701 701 __tablename__ = 'user_api_keys'
702 702 __table_args__ = (
703 703 Index('uak_api_key_idx', 'api_key'),
704 704 Index('uak_api_key_expires_idx', 'api_key', 'expires'),
705 705 _table_args_default_dict,
706 706 )
707 707
708 708 user_api_key_id = Column(Integer(), primary_key=True)
709 709 user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
710 710 api_key = Column(String(255), nullable=False, unique=True)
711 711 description = Column(UnicodeText(), nullable=False)
712 712 expires = Column(Float(53), nullable=False)
713 713 created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
714 714
715 715 user = relationship('User')
716 716
717 717 @hybrid_property
718 718 def is_expired(self):
719 719 return (self.expires != -1) & (time.time() > self.expires)
720 720
721 721
722 722 class UserEmailMap(Base, BaseDbModel):
723 723 __tablename__ = 'user_email_map'
724 724 __table_args__ = (
725 725 Index('uem_email_idx', 'email'),
726 726 _table_args_default_dict,
727 727 )
728 728
729 729 email_id = Column(Integer(), primary_key=True)
730 730 user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
731 731 _email = Column("email", String(255), nullable=False, unique=True)
732 732 user = relationship('User')
733 733
734 734 @validates('_email')
735 735 def validate_email(self, key, email):
736 736 # check if this email is not main one
737 737 main_email = Session().query(User).filter(User.email == email).scalar()
738 738 if main_email is not None:
739 739 raise AttributeError('email %s is present is user table' % email)
740 740 return email
741 741
742 742 @hybrid_property
743 743 def email(self):
744 744 return self._email
745 745
746 746 @email.setter
747 747 def email(self, val):
748 748 self._email = val.lower() if val else None
749 749
750 750
751 751 class UserIpMap(Base, BaseDbModel):
752 752 __tablename__ = 'user_ip_map'
753 753 __table_args__ = (
754 754 UniqueConstraint('user_id', 'ip_addr'),
755 755 _table_args_default_dict,
756 756 )
757 757
758 758 ip_id = Column(Integer(), primary_key=True)
759 759 user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
760 760 ip_addr = Column(String(255), nullable=False)
761 761 active = Column(Boolean(), nullable=False, default=True)
762 762 user = relationship('User')
763 763
764 764 @classmethod
765 765 def _get_ip_range(cls, ip_addr):
766 766 net = ipaddr.IPNetwork(address=ip_addr)
767 767 return [str(net.network), str(net.broadcast)]
768 768
769 769 def __json__(self):
770 770 return dict(
771 771 ip_addr=self.ip_addr,
772 772 ip_range=self._get_ip_range(self.ip_addr)
773 773 )
774 774
775 775 def __unicode__(self):
776 776 return u"<%s('user_id:%s=>%s')>" % (self.__class__.__name__,
777 777 self.user_id, self.ip_addr)
778 778
779 779
780 780 class UserLog(Base, BaseDbModel):
781 781 __tablename__ = 'user_logs'
782 782 __table_args__ = (
783 783 _table_args_default_dict,
784 784 )
785 785
786 786 user_log_id = Column(Integer(), primary_key=True)
787 787 user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=True)
788 788 username = Column(String(255), nullable=False)
789 789 repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
790 790 repository_name = Column(Unicode(255), nullable=False)
791 791 user_ip = Column(String(255), nullable=True)
792 792 action = Column(UnicodeText(), nullable=False)
793 793 action_date = Column(DateTime(timezone=False), nullable=False)
794 794
795 795 def __unicode__(self):
796 796 return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
797 797 self.repository_name,
798 798 self.action)
799 799
800 800 @property
801 801 def action_as_day(self):
802 802 return datetime.date(*self.action_date.timetuple()[:3])
803 803
804 804 user = relationship('User')
805 805 repository = relationship('Repository', cascade='')
806 806
807 807
808 808 class UserGroup(Base, BaseDbModel):
809 809 __tablename__ = 'users_groups'
810 810 __table_args__ = (
811 811 _table_args_default_dict,
812 812 )
813 813
814 814 users_group_id = Column(Integer(), primary_key=True)
815 815 users_group_name = Column(Unicode(255), nullable=False, unique=True)
816 816 user_group_description = Column(Unicode(10000), nullable=True) # FIXME: not nullable?
817 817 users_group_active = Column(Boolean(), nullable=False)
818 818 owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
819 819 created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
820 820 _group_data = Column("group_data", LargeBinary(), nullable=True) # JSON data # FIXME: not nullable?
821 821
822 822 members = relationship('UserGroupMember', cascade="all, delete-orphan")
823 823 users_group_to_perm = relationship('UserGroupToPerm', cascade='all')
824 824 users_group_repo_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
825 825 users_group_repo_group_to_perm = relationship('UserGroupRepoGroupToPerm', cascade='all')
826 826 user_user_group_to_perm = relationship('UserUserGroupToPerm ', cascade='all')
827 827 user_group_user_group_to_perm = relationship('UserGroupUserGroupToPerm ', primaryjoin="UserGroupUserGroupToPerm.target_user_group_id==UserGroup.users_group_id", cascade='all')
828 828
829 829 owner = relationship('User')
830 830
831 831 @hybrid_property
832 832 def group_data(self):
833 833 if not self._group_data:
834 834 return {}
835 835
836 836 try:
837 837 return json.loads(self._group_data)
838 838 except TypeError:
839 839 return {}
840 840
841 841 @group_data.setter
842 842 def group_data(self, val):
843 843 try:
844 844 self._group_data = json.dumps(val)
845 845 except Exception:
846 846 log.error(traceback.format_exc())
847 847
848 848 def __unicode__(self):
849 849 return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
850 850 self.users_group_id,
851 851 self.users_group_name)
852 852
853 853 @classmethod
854 854 def guess_instance(cls, value):
855 855 return super(UserGroup, cls).guess_instance(value, UserGroup.get_by_group_name)
856 856
857 857 @classmethod
858 858 def get_by_group_name(cls, group_name, cache=False,
859 859 case_insensitive=False):
860 860 if case_insensitive:
861 861 q = cls.query().filter(func.lower(cls.users_group_name) == func.lower(group_name))
862 862 else:
863 863 q = cls.query().filter(cls.users_group_name == group_name)
864 864 if cache:
865 865 q = q.options(FromCache(
866 866 "sql_cache_short",
867 867 "get_group_%s" % _hash_key(group_name)
868 868 )
869 869 )
870 870 return q.scalar()
871 871
872 872 @classmethod
873 873 def get(cls, user_group_id, cache=False):
874 874 user_group = cls.query()
875 875 if cache:
876 876 user_group = user_group.options(FromCache("sql_cache_short",
877 877 "get_users_group_%s" % user_group_id))
878 878 return user_group.get(user_group_id)
879 879
880 880 def get_api_data(self, with_members=True):
881 881 user_group = self
882 882
883 883 data = dict(
884 884 users_group_id=user_group.users_group_id,
885 885 group_name=user_group.users_group_name,
886 886 group_description=user_group.user_group_description,
887 887 active=user_group.users_group_active,
888 888 owner=user_group.owner.username,
889 889 )
890 890 if with_members:
891 891 data['members'] = [
892 892 ugm.user.get_api_data()
893 893 for ugm in user_group.members
894 894 ]
895 895
896 896 return data
897 897
898 898
899 899 class UserGroupMember(Base, BaseDbModel):
900 900 __tablename__ = 'users_groups_members'
901 901 __table_args__ = (
902 902 _table_args_default_dict,
903 903 )
904 904
905 905 users_group_member_id = Column(Integer(), primary_key=True)
906 906 users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
907 907 user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
908 908
909 909 user = relationship('User')
910 910 users_group = relationship('UserGroup')
911 911
912 912 def __init__(self, gr_id='', u_id=''):
913 913 self.users_group_id = gr_id
914 914 self.user_id = u_id
915 915
916 916
917 917 class RepositoryField(Base, BaseDbModel):
918 918 __tablename__ = 'repositories_fields'
919 919 __table_args__ = (
920 920 UniqueConstraint('repository_id', 'field_key'), # no-multi field
921 921 _table_args_default_dict,
922 922 )
923 923
924 924 PREFIX = 'ex_' # prefix used in form to not conflict with already existing fields
925 925
926 926 repo_field_id = Column(Integer(), primary_key=True)
927 927 repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
928 928 field_key = Column(String(250), nullable=False)
929 929 field_label = Column(String(1024), nullable=False)
930 930 field_value = Column(String(10000), nullable=False)
931 931 field_desc = Column(String(1024), nullable=False)
932 932 field_type = Column(String(255), nullable=False)
933 933 created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
934 934
935 935 repository = relationship('Repository')
936 936
937 937 @property
938 938 def field_key_prefixed(self):
939 939 return 'ex_%s' % self.field_key
940 940
941 941 @classmethod
942 942 def un_prefix_key(cls, key):
943 943 if key.startswith(cls.PREFIX):
944 944 return key[len(cls.PREFIX):]
945 945 return key
946 946
947 947 @classmethod
948 948 def get_by_key_name(cls, key, repo):
949 949 row = cls.query() \
950 950 .filter(cls.repository == repo) \
951 951 .filter(cls.field_key == key).scalar()
952 952 return row
953 953
954 954
955 955 class Repository(Base, BaseDbModel):
956 956 __tablename__ = 'repositories'
957 957 __table_args__ = (
958 958 Index('r_repo_name_idx', 'repo_name'),
959 959 _table_args_default_dict,
960 960 )
961 961
962 962 DEFAULT_CLONE_URI = '{scheme}://{user}@{netloc}/{repo}'
963 963 DEFAULT_CLONE_SSH = 'ssh://{system_user}@{hostname}/{repo}'
964 964
965 965 STATE_CREATED = u'repo_state_created'
966 966 STATE_PENDING = u'repo_state_pending'
967 967 STATE_ERROR = u'repo_state_error'
968 968
969 969 repo_id = Column(Integer(), primary_key=True)
970 970 repo_name = Column(Unicode(255), nullable=False, unique=True)
971 971 repo_state = Column(String(255), nullable=False)
972 972
973 973 clone_uri = Column(String(255), nullable=True) # FIXME: not nullable?
974 974 repo_type = Column(String(255), nullable=False) # 'hg' or 'git'
975 975 owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
976 976 private = Column(Boolean(), nullable=False)
977 977 enable_statistics = Column("statistics", Boolean(), nullable=False, default=True)
978 978 enable_downloads = Column("downloads", Boolean(), nullable=False, default=True)
979 979 description = Column(Unicode(10000), nullable=False)
980 980 created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
981 981 updated_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
982 982 _landing_revision = Column("landing_revision", String(255), nullable=False)
983 983 _changeset_cache = Column("changeset_cache", LargeBinary(), nullable=True) # JSON data # FIXME: not nullable?
984 984
985 985 fork_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
986 986 group_id = Column(Integer(), ForeignKey('groups.group_id'), nullable=True)
987 987
988 988 owner = relationship('User')
989 989 fork = relationship('Repository', remote_side=repo_id)
990 990 group = relationship('RepoGroup')
991 991 repo_to_perm = relationship('UserRepoToPerm', cascade='all', order_by='UserRepoToPerm.repo_to_perm_id')
992 992 users_group_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
993 993 stats = relationship('Statistics', cascade='all', uselist=False)
994 994
995 995 followers = relationship('UserFollowing',
996 996 primaryjoin='UserFollowing.follows_repository_id==Repository.repo_id',
997 997 cascade='all')
998 998 extra_fields = relationship('RepositoryField',
999 999 cascade="all, delete-orphan")
1000 1000
1001 1001 logs = relationship('UserLog')
1002 1002 comments = relationship('ChangesetComment', cascade="all, delete-orphan")
1003 1003
1004 1004 pull_requests_org = relationship('PullRequest',
1005 1005 primaryjoin='PullRequest.org_repo_id==Repository.repo_id',
1006 1006 cascade="all, delete-orphan")
1007 1007
1008 1008 pull_requests_other = relationship('PullRequest',
1009 1009 primaryjoin='PullRequest.other_repo_id==Repository.repo_id',
1010 1010 cascade="all, delete-orphan")
1011 1011
1012 1012 def __unicode__(self):
1013 1013 return u"<%s('%s:%s')>" % (self.__class__.__name__, self.repo_id,
1014 1014 safe_unicode(self.repo_name))
1015 1015
1016 1016 @hybrid_property
1017 1017 def landing_rev(self):
1018 1018 # always should return [rev_type, rev]
1019 1019 if self._landing_revision:
1020 1020 _rev_info = self._landing_revision.split(':')
1021 1021 if len(_rev_info) < 2:
1022 1022 _rev_info.insert(0, 'rev')
1023 1023 return [_rev_info[0], _rev_info[1]]
1024 1024 return [None, None]
1025 1025
1026 1026 @landing_rev.setter
1027 1027 def landing_rev(self, val):
1028 1028 if ':' not in val:
1029 1029 raise ValueError('value must be delimited with `:` and consist '
1030 1030 'of <rev_type>:<rev>, got %s instead' % val)
1031 1031 self._landing_revision = val
1032 1032
1033 1033 @hybrid_property
1034 1034 def changeset_cache(self):
1035 1035 try:
1036 1036 cs_cache = json.loads(self._changeset_cache) # might raise on bad data
1037 1037 cs_cache['raw_id'] # verify data, raise exception on error
1038 1038 return cs_cache
1039 1039 except (TypeError, KeyError, ValueError):
1040 1040 return EmptyChangeset().__json__()
1041 1041
1042 1042 @changeset_cache.setter
1043 1043 def changeset_cache(self, val):
1044 1044 try:
1045 1045 self._changeset_cache = json.dumps(val)
1046 1046 except Exception:
1047 1047 log.error(traceback.format_exc())
1048 1048
1049 1049 @classmethod
1050 1050 def query(cls, sorted=False):
1051 1051 """Add Repository-specific helpers for common query constructs.
1052 1052
1053 1053 sorted: if True, apply the default ordering (name, case insensitive).
1054 1054 """
1055 1055 q = super(Repository, cls).query()
1056 1056
1057 1057 if sorted:
1058 1058 q = q.order_by(func.lower(Repository.repo_name))
1059 1059
1060 1060 return q
1061 1061
1062 1062 @classmethod
1063 1063 def url_sep(cls):
1064 1064 return URL_SEP
1065 1065
1066 1066 @classmethod
1067 1067 def normalize_repo_name(cls, repo_name):
1068 1068 """
1069 1069 Normalizes os specific repo_name to the format internally stored inside
1070 1070 database using URL_SEP
1071 1071
1072 1072 :param cls:
1073 1073 :param repo_name:
1074 1074 """
1075 1075 return cls.url_sep().join(repo_name.split(os.sep))
1076 1076
1077 1077 @classmethod
1078 1078 def guess_instance(cls, value):
1079 1079 return super(Repository, cls).guess_instance(value, Repository.get_by_repo_name)
1080 1080
1081 1081 @classmethod
1082 1082 def get_by_repo_name(cls, repo_name, case_insensitive=False):
1083 1083 """Get the repo, defaulting to database case sensitivity.
1084 1084 case_insensitive will be slower and should only be specified if necessary."""
1085 1085 if case_insensitive:
1086 1086 q = Session().query(cls).filter(func.lower(cls.repo_name) == func.lower(repo_name))
1087 1087 else:
1088 1088 q = Session().query(cls).filter(cls.repo_name == repo_name)
1089 1089 q = q.options(joinedload(Repository.fork)) \
1090 1090 .options(joinedload(Repository.owner)) \
1091 1091 .options(joinedload(Repository.group))
1092 1092 return q.scalar()
1093 1093
1094 1094 @classmethod
1095 1095 def get_by_full_path(cls, repo_full_path):
1096 1096 base_full_path = os.path.realpath(cls.base_path())
1097 1097 repo_full_path = os.path.realpath(repo_full_path)
1098 1098 assert repo_full_path.startswith(base_full_path + os.path.sep)
1099 1099 repo_name = repo_full_path[len(base_full_path) + 1:]
1100 1100 repo_name = cls.normalize_repo_name(repo_name)
1101 1101 return cls.get_by_repo_name(repo_name.strip(URL_SEP))
1102 1102
1103 1103 @classmethod
1104 1104 def get_repo_forks(cls, repo_id):
1105 1105 return cls.query().filter(Repository.fork_id == repo_id)
1106 1106
1107 1107 @classmethod
1108 1108 def base_path(cls):
1109 1109 """
1110 1110 Returns base path where all repos are stored
1111 1111
1112 1112 :param cls:
1113 1113 """
1114 1114 q = Session().query(Ui) \
1115 1115 .filter(Ui.ui_key == cls.url_sep())
1116 1116 q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
1117 1117 return q.one().ui_value
1118 1118
1119 1119 @property
1120 1120 def forks(self):
1121 1121 """
1122 1122 Return forks of this repo
1123 1123 """
1124 1124 return Repository.get_repo_forks(self.repo_id)
1125 1125
1126 1126 @property
1127 1127 def parent(self):
1128 1128 """
1129 1129 Returns fork parent
1130 1130 """
1131 1131 return self.fork
1132 1132
1133 1133 @property
1134 1134 def just_name(self):
1135 1135 return self.repo_name.split(Repository.url_sep())[-1]
1136 1136
1137 1137 @property
1138 1138 def groups_with_parents(self):
1139 1139 groups = []
1140 1140 group = self.group
1141 1141 while group is not None:
1142 1142 groups.append(group)
1143 1143 group = group.parent_group
1144 1144 assert group not in groups, group # avoid recursion on bad db content
1145 1145 groups.reverse()
1146 1146 return groups
1147 1147
1148 1148 @LazyProperty
1149 1149 def repo_path(self):
1150 1150 """
1151 1151 Returns base full path for that repository means where it actually
1152 1152 exists on a filesystem
1153 1153 """
1154 1154 q = Session().query(Ui).filter(Ui.ui_key ==
1155 1155 Repository.url_sep())
1156 1156 q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
1157 1157 return q.one().ui_value
1158 1158
1159 1159 @property
1160 1160 def repo_full_path(self):
1161 1161 p = [self.repo_path]
1162 1162 # we need to split the name by / since this is how we store the
1163 1163 # names in the database, but that eventually needs to be converted
1164 1164 # into a valid system path
1165 1165 p += self.repo_name.split(Repository.url_sep())
1166 1166 return os.path.join(*map(safe_unicode, p))
1167 1167
1168 1168 @property
1169 1169 def cache_keys(self):
1170 1170 """
1171 1171 Returns associated cache keys for that repo
1172 1172 """
1173 1173 return CacheInvalidation.query() \
1174 1174 .filter(CacheInvalidation.cache_args == self.repo_name) \
1175 1175 .order_by(CacheInvalidation.cache_key) \
1176 1176 .all()
1177 1177
1178 1178 def get_new_name(self, repo_name):
1179 1179 """
1180 1180 returns new full repository name based on assigned group and new new
1181 1181
1182 1182 :param group_name:
1183 1183 """
1184 1184 path_prefix = self.group.full_path_splitted if self.group else []
1185 1185 return Repository.url_sep().join(path_prefix + [repo_name])
1186 1186
1187 1187 @property
1188 1188 def _ui(self):
1189 1189 """
1190 1190 Creates an db based ui object for this repository
1191 1191 """
1192 1192 from kallithea.lib.utils import make_ui
1193 1193 return make_ui()
1194 1194
1195 1195 @classmethod
1196 1196 def is_valid(cls, repo_name):
1197 1197 """
1198 1198 returns True if given repo name is a valid filesystem repository
1199 1199
1200 1200 :param cls:
1201 1201 :param repo_name:
1202 1202 """
1203 1203 from kallithea.lib.utils import is_valid_repo
1204 1204
1205 1205 return is_valid_repo(repo_name, cls.base_path())
1206 1206
1207 1207 def get_api_data(self, with_revision_names=False,
1208 1208 with_pullrequests=False):
1209 1209 """
1210 1210 Common function for generating repo api data.
1211 1211 Optionally, also return tags, branches, bookmarks and PRs.
1212 1212 """
1213 1213 repo = self
1214 1214 data = dict(
1215 1215 repo_id=repo.repo_id,
1216 1216 repo_name=repo.repo_name,
1217 1217 repo_type=repo.repo_type,
1218 1218 clone_uri=repo.clone_uri,
1219 1219 private=repo.private,
1220 1220 created_on=repo.created_on,
1221 1221 description=repo.description,
1222 1222 landing_rev=repo.landing_rev,
1223 1223 owner=repo.owner.username,
1224 1224 fork_of=repo.fork.repo_name if repo.fork else None,
1225 1225 enable_statistics=repo.enable_statistics,
1226 1226 enable_downloads=repo.enable_downloads,
1227 1227 last_changeset=repo.changeset_cache,
1228 1228 )
1229 1229 if with_revision_names:
1230 1230 scm_repo = repo.scm_instance_no_cache()
1231 1231 data.update(dict(
1232 1232 tags=scm_repo.tags,
1233 1233 branches=scm_repo.branches,
1234 1234 bookmarks=scm_repo.bookmarks,
1235 1235 ))
1236 1236 if with_pullrequests:
1237 1237 data['pull_requests'] = repo.pull_requests_other
1238 1238 rc_config = Setting.get_app_settings()
1239 1239 repository_fields = str2bool(rc_config.get('repository_fields'))
1240 1240 if repository_fields:
1241 1241 for f in self.extra_fields:
1242 1242 data[f.field_key_prefixed] = f.field_value
1243 1243
1244 1244 return data
1245 1245
1246 1246 @property
1247 1247 def last_db_change(self):
1248 1248 return self.updated_on
1249 1249
1250 1250 @property
1251 1251 def clone_uri_hidden(self):
1252 1252 clone_uri = self.clone_uri
1253 1253 if clone_uri:
1254 1254 import urlobject
1255 1255 url_obj = urlobject.URLObject(self.clone_uri)
1256 1256 if url_obj.password:
1257 1257 clone_uri = url_obj.with_password('*****')
1258 1258 return clone_uri
1259 1259
1260 1260 def clone_url(self, clone_uri_tmpl, with_id=False, username=None):
1261 1261 if '{repo}' not in clone_uri_tmpl and '_{repoid}' not in clone_uri_tmpl:
1262 1262 log.error("Configured clone_uri_tmpl %r has no '{repo}' or '_{repoid}' and cannot toggle to use repo id URLs", clone_uri_tmpl)
1263 1263 elif with_id:
1264 1264 clone_uri_tmpl = clone_uri_tmpl.replace('{repo}', '_{repoid}')
1265 1265 else:
1266 1266 clone_uri_tmpl = clone_uri_tmpl.replace('_{repoid}', '{repo}')
1267 1267
1268 1268 import kallithea.lib.helpers as h
1269 1269 prefix_url = h.canonical_url('home')
1270 1270
1271 1271 return get_clone_url(clone_uri_tmpl=clone_uri_tmpl,
1272 1272 prefix_url=prefix_url,
1273 1273 repo_name=self.repo_name,
1274 1274 repo_id=self.repo_id,
1275 1275 username=username)
1276 1276
1277 1277 def set_state(self, state):
1278 1278 self.repo_state = state
1279 1279
1280 1280 #==========================================================================
1281 1281 # SCM PROPERTIES
1282 1282 #==========================================================================
1283 1283
1284 1284 def get_changeset(self, rev=None):
1285 1285 return get_changeset_safe(self.scm_instance, rev)
1286 1286
1287 1287 def get_landing_changeset(self):
1288 1288 """
1289 1289 Returns landing changeset, or if that doesn't exist returns the tip
1290 1290 """
1291 1291 _rev_type, _rev = self.landing_rev
1292 1292 cs = self.get_changeset(_rev)
1293 1293 if isinstance(cs, EmptyChangeset):
1294 1294 return self.get_changeset()
1295 1295 return cs
1296 1296
1297 1297 def update_changeset_cache(self, cs_cache=None):
1298 1298 """
1299 1299 Update cache of last changeset for repository, keys should be::
1300 1300
1301 1301 short_id
1302 1302 raw_id
1303 1303 revision
1304 1304 message
1305 1305 date
1306 1306 author
1307 1307
1308 1308 :param cs_cache:
1309 1309 """
1310 1310 from kallithea.lib.vcs.backends.base import BaseChangeset
1311 1311 if cs_cache is None:
1312 1312 cs_cache = EmptyChangeset()
1313 1313 # use no-cache version here
1314 1314 scm_repo = self.scm_instance_no_cache()
1315 1315 if scm_repo:
1316 1316 cs_cache = scm_repo.get_changeset()
1317 1317
1318 1318 if isinstance(cs_cache, BaseChangeset):
1319 1319 cs_cache = cs_cache.__json__()
1320 1320
1321 1321 if (not self.changeset_cache or cs_cache['raw_id'] != self.changeset_cache['raw_id']):
1322 1322 _default = datetime.datetime.fromtimestamp(0)
1323 1323 last_change = cs_cache.get('date') or _default
1324 1324 log.debug('updated repo %s with new cs cache %s',
1325 1325 self.repo_name, cs_cache)
1326 1326 self.updated_on = last_change
1327 1327 self.changeset_cache = cs_cache
1328 1328 Session().commit()
1329 1329 else:
1330 1330 log.debug('changeset_cache for %s already up to date with %s',
1331 1331 self.repo_name, cs_cache['raw_id'])
1332 1332
1333 1333 @property
1334 1334 def tip(self):
1335 1335 return self.get_changeset('tip')
1336 1336
1337 1337 @property
1338 1338 def author(self):
1339 1339 return self.tip.author
1340 1340
1341 1341 @property
1342 1342 def last_change(self):
1343 1343 return self.scm_instance.last_change
1344 1344
1345 1345 def get_comments(self, revisions=None):
1346 1346 """
1347 1347 Returns comments for this repository grouped by revisions
1348 1348
1349 1349 :param revisions: filter query by revisions only
1350 1350 """
1351 1351 cmts = ChangesetComment.query() \
1352 1352 .filter(ChangesetComment.repo == self)
1353 1353 if revisions is not None:
1354 1354 if not revisions:
1355 1355 return {} # don't use sql 'in' on empty set
1356 1356 cmts = cmts.filter(ChangesetComment.revision.in_(revisions))
1357 1357 grouped = collections.defaultdict(list)
1358 1358 for cmt in cmts.all():
1359 1359 grouped[cmt.revision].append(cmt)
1360 1360 return grouped
1361 1361
1362 1362 def statuses(self, revisions):
1363 1363 """
1364 1364 Returns statuses for this repository.
1365 1365 PRs without any votes do _not_ show up as unreviewed.
1366 1366
1367 1367 :param revisions: list of revisions to get statuses for
1368 1368 """
1369 1369 if not revisions:
1370 1370 return {}
1371 1371
1372 1372 statuses = ChangesetStatus.query() \
1373 1373 .filter(ChangesetStatus.repo == self) \
1374 1374 .filter(ChangesetStatus.version == 0) \
1375 1375 .filter(ChangesetStatus.revision.in_(revisions))
1376 1376
1377 1377 grouped = {}
1378 1378 for stat in statuses.all():
1379 1379 pr_id = pr_nice_id = pr_repo = None
1380 1380 if stat.pull_request:
1381 1381 pr_id = stat.pull_request.pull_request_id
1382 1382 pr_nice_id = PullRequest.make_nice_id(pr_id)
1383 1383 pr_repo = stat.pull_request.other_repo.repo_name
1384 1384 grouped[stat.revision] = [str(stat.status), stat.status_lbl,
1385 1385 pr_id, pr_repo, pr_nice_id,
1386 1386 stat.author]
1387 1387 return grouped
1388 1388
1389 1389 def _repo_size(self):
1390 1390 from kallithea.lib import helpers as h
1391 1391 log.debug('calculating repository size...')
1392 1392 return h.format_byte_size(self.scm_instance.size)
1393 1393
1394 1394 #==========================================================================
1395 1395 # SCM CACHE INSTANCE
1396 1396 #==========================================================================
1397 1397
1398 1398 def set_invalidate(self):
1399 1399 """
1400 1400 Mark caches of this repo as invalid.
1401 1401 """
1402 1402 CacheInvalidation.set_invalidate(self.repo_name)
1403 1403
1404 1404 _scm_instance = None
1405 1405
1406 1406 @property
1407 1407 def scm_instance(self):
1408 1408 if self._scm_instance is None:
1409 1409 self._scm_instance = self.scm_instance_cached()
1410 1410 return self._scm_instance
1411 1411
1412 1412 def scm_instance_cached(self, valid_cache_keys=None):
1413 1413 @cache_region('long_term', 'scm_instance_cached')
1414 1414 def _c(repo_name): # repo_name is just for the cache key
1415 1415 log.debug('Creating new %s scm_instance and populating cache', repo_name)
1416 1416 return self.scm_instance_no_cache()
1417 1417 rn = self.repo_name
1418 1418
1419 1419 valid = CacheInvalidation.test_and_set_valid(rn, None, valid_cache_keys=valid_cache_keys)
1420 1420 if not valid:
1421 1421 log.debug('Cache for %s invalidated, getting new object', rn)
1422 1422 region_invalidate(_c, None, 'scm_instance_cached', rn)
1423 1423 else:
1424 1424 log.debug('Trying to get scm_instance of %s from cache', rn)
1425 1425 return _c(rn)
1426 1426
1427 1427 def scm_instance_no_cache(self):
1428 1428 repo_full_path = safe_str(self.repo_full_path)
1429 1429 alias = get_scm(repo_full_path)[0]
1430 1430 log.debug('Creating instance of %s repository from %s',
1431 1431 alias, self.repo_full_path)
1432 1432 backend = get_backend(alias)
1433 1433
1434 1434 if alias == 'hg':
1435 1435 repo = backend(repo_full_path, create=False,
1436 1436 baseui=self._ui)
1437 1437 else:
1438 1438 repo = backend(repo_full_path, create=False)
1439 1439
1440 1440 return repo
1441 1441
1442 1442 def __json__(self):
1443 1443 return dict(
1444 1444 repo_id=self.repo_id,
1445 1445 repo_name=self.repo_name,
1446 1446 landing_rev=self.landing_rev,
1447 1447 )
1448 1448
1449 1449
1450 1450 class RepoGroup(Base, BaseDbModel):
1451 1451 __tablename__ = 'groups'
1452 1452 __table_args__ = (
1453 1453 _table_args_default_dict,
1454 1454 )
1455 1455
1456 1456 SEP = ' &raquo; '
1457 1457
1458 1458 group_id = Column(Integer(), primary_key=True)
1459 1459 group_name = Column(Unicode(255), nullable=False, unique=True) # full path
1460 1460 parent_group_id = Column('group_parent_id', Integer(), ForeignKey('groups.group_id'), nullable=True)
1461 1461 group_description = Column(Unicode(10000), nullable=False)
1462 1462 owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
1463 1463 created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
1464 1464
1465 1465 repo_group_to_perm = relationship('UserRepoGroupToPerm', cascade='all', order_by='UserRepoGroupToPerm.group_to_perm_id')
1466 1466 users_group_to_perm = relationship('UserGroupRepoGroupToPerm', cascade='all')
1467 1467 parent_group = relationship('RepoGroup', remote_side=group_id)
1468 1468 owner = relationship('User')
1469 1469
1470 1470 @classmethod
1471 1471 def query(cls, sorted=False):
1472 1472 """Add RepoGroup-specific helpers for common query constructs.
1473 1473
1474 1474 sorted: if True, apply the default ordering (name, case insensitive).
1475 1475 """
1476 1476 q = super(RepoGroup, cls).query()
1477 1477
1478 1478 if sorted:
1479 1479 q = q.order_by(func.lower(RepoGroup.group_name))
1480 1480
1481 1481 return q
1482 1482
1483 1483 def __init__(self, group_name='', parent_group=None):
1484 1484 self.group_name = group_name
1485 1485 self.parent_group = parent_group
1486 1486
1487 1487 def __unicode__(self):
1488 1488 return u"<%s('id:%s:%s')>" % (self.__class__.__name__, self.group_id,
1489 1489 self.group_name)
1490 1490
1491 1491 @classmethod
1492 1492 def _generate_choice(cls, repo_group):
1493 1493 """Return tuple with group_id and name as html literal"""
1494 1494 from webhelpers2.html import literal
1495 1495 if repo_group is None:
1496 1496 return (-1, u'-- %s --' % _('top level'))
1497 1497 return repo_group.group_id, literal(cls.SEP.join(repo_group.full_path_splitted))
1498 1498
1499 1499 @classmethod
1500 1500 def groups_choices(cls, groups):
1501 1501 """Return tuples with group_id and name as html literal."""
1502 1502 return sorted((cls._generate_choice(g) for g in groups),
1503 1503 key=lambda c: c[1].split(cls.SEP))
1504 1504
1505 1505 @classmethod
1506 1506 def url_sep(cls):
1507 1507 return URL_SEP
1508 1508
1509 1509 @classmethod
1510 1510 def guess_instance(cls, value):
1511 1511 return super(RepoGroup, cls).guess_instance(value, RepoGroup.get_by_group_name)
1512 1512
1513 1513 @classmethod
1514 1514 def get_by_group_name(cls, group_name, cache=False, case_insensitive=False):
1515 1515 group_name = group_name.rstrip('/')
1516 1516 if case_insensitive:
1517 1517 gr = cls.query() \
1518 1518 .filter(func.lower(cls.group_name) == func.lower(group_name))
1519 1519 else:
1520 1520 gr = cls.query() \
1521 1521 .filter(cls.group_name == group_name)
1522 1522 if cache:
1523 1523 gr = gr.options(FromCache(
1524 1524 "sql_cache_short",
1525 1525 "get_group_%s" % _hash_key(group_name)
1526 1526 )
1527 1527 )
1528 1528 return gr.scalar()
1529 1529
1530 1530 @property
1531 1531 def parents(self):
1532 1532 groups = []
1533 1533 group = self.parent_group
1534 1534 while group is not None:
1535 1535 groups.append(group)
1536 1536 group = group.parent_group
1537 1537 assert group not in groups, group # avoid recursion on bad db content
1538 1538 groups.reverse()
1539 1539 return groups
1540 1540
1541 1541 @property
1542 1542 def children(self):
1543 1543 return RepoGroup.query().filter(RepoGroup.parent_group == self)
1544 1544
1545 1545 @property
1546 1546 def name(self):
1547 1547 return self.group_name.split(RepoGroup.url_sep())[-1]
1548 1548
1549 1549 @property
1550 1550 def full_path(self):
1551 1551 return self.group_name
1552 1552
1553 1553 @property
1554 1554 def full_path_splitted(self):
1555 1555 return self.group_name.split(RepoGroup.url_sep())
1556 1556
1557 1557 @property
1558 1558 def repositories(self):
1559 1559 return Repository.query(sorted=True).filter_by(group=self)
1560 1560
1561 1561 @property
1562 1562 def repositories_recursive_count(self):
1563 1563 cnt = self.repositories.count()
1564 1564
1565 1565 def children_count(group):
1566 1566 cnt = 0
1567 1567 for child in group.children:
1568 1568 cnt += child.repositories.count()
1569 1569 cnt += children_count(child)
1570 1570 return cnt
1571 1571
1572 1572 return cnt + children_count(self)
1573 1573
1574 1574 def _recursive_objects(self, include_repos=True):
1575 1575 all_ = []
1576 1576
1577 1577 def _get_members(root_gr):
1578 1578 if include_repos:
1579 1579 for r in root_gr.repositories:
1580 1580 all_.append(r)
1581 1581 childs = root_gr.children.all()
1582 1582 if childs:
1583 1583 for gr in childs:
1584 1584 all_.append(gr)
1585 1585 _get_members(gr)
1586 1586
1587 1587 _get_members(self)
1588 1588 return [self] + all_
1589 1589
1590 1590 def recursive_groups_and_repos(self):
1591 1591 """
1592 1592 Recursive return all groups, with repositories in those groups
1593 1593 """
1594 1594 return self._recursive_objects()
1595 1595
1596 1596 def recursive_groups(self):
1597 1597 """
1598 1598 Returns all children groups for this group including children of children
1599 1599 """
1600 1600 return self._recursive_objects(include_repos=False)
1601 1601
1602 1602 def get_new_name(self, group_name):
1603 1603 """
1604 1604 returns new full group name based on parent and new name
1605 1605
1606 1606 :param group_name:
1607 1607 """
1608 1608 path_prefix = (self.parent_group.full_path_splitted if
1609 1609 self.parent_group else [])
1610 1610 return RepoGroup.url_sep().join(path_prefix + [group_name])
1611 1611
1612 1612 def get_api_data(self):
1613 1613 """
1614 1614 Common function for generating api data
1615 1615
1616 1616 """
1617 1617 group = self
1618 1618 data = dict(
1619 1619 group_id=group.group_id,
1620 1620 group_name=group.group_name,
1621 1621 group_description=group.group_description,
1622 1622 parent_group=group.parent_group.group_name if group.parent_group else None,
1623 1623 repositories=[x.repo_name for x in group.repositories],
1624 1624 owner=group.owner.username
1625 1625 )
1626 1626 return data
1627 1627
1628 1628
1629 1629 class Permission(Base, BaseDbModel):
1630 1630 __tablename__ = 'permissions'
1631 1631 __table_args__ = (
1632 1632 Index('p_perm_name_idx', 'permission_name'),
1633 1633 _table_args_default_dict,
1634 1634 )
1635 1635
1636 1636 PERMS = (
1637 1637 ('hg.admin', _('Kallithea Administrator')),
1638 1638
1639 1639 ('repository.none', _('Default user has no access to new repositories')),
1640 1640 ('repository.read', _('Default user has read access to new repositories')),
1641 1641 ('repository.write', _('Default user has write access to new repositories')),
1642 1642 ('repository.admin', _('Default user has admin access to new repositories')),
1643 1643
1644 1644 ('group.none', _('Default user has no access to new repository groups')),
1645 1645 ('group.read', _('Default user has read access to new repository groups')),
1646 1646 ('group.write', _('Default user has write access to new repository groups')),
1647 1647 ('group.admin', _('Default user has admin access to new repository groups')),
1648 1648
1649 1649 ('usergroup.none', _('Default user has no access to new user groups')),
1650 1650 ('usergroup.read', _('Default user has read access to new user groups')),
1651 1651 ('usergroup.write', _('Default user has write access to new user groups')),
1652 1652 ('usergroup.admin', _('Default user has admin access to new user groups')),
1653 1653
1654 1654 ('hg.repogroup.create.false', _('Only admins can create repository groups')),
1655 1655 ('hg.repogroup.create.true', _('Non-admins can create repository groups')),
1656 1656
1657 1657 ('hg.usergroup.create.false', _('Only admins can create user groups')),
1658 1658 ('hg.usergroup.create.true', _('Non-admins can create user groups')),
1659 1659
1660 1660 ('hg.create.none', _('Only admins can create top level repositories')),
1661 1661 ('hg.create.repository', _('Non-admins can create top level repositories')),
1662 1662
1663 1663 ('hg.create.write_on_repogroup.true', _('Repository creation enabled with write permission to a repository group')),
1664 1664 ('hg.create.write_on_repogroup.false', _('Repository creation disabled with write permission to a repository group')),
1665 1665
1666 1666 ('hg.fork.none', _('Only admins can fork repositories')),
1667 1667 ('hg.fork.repository', _('Non-admins can fork repositories')),
1668 1668
1669 1669 ('hg.register.none', _('Registration disabled')),
1670 1670 ('hg.register.manual_activate', _('User registration with manual account activation')),
1671 1671 ('hg.register.auto_activate', _('User registration with automatic account activation')),
1672 1672
1673 1673 ('hg.extern_activate.manual', _('Manual activation of external account')),
1674 1674 ('hg.extern_activate.auto', _('Automatic activation of external account')),
1675 1675 )
1676 1676
1677 1677 # definition of system default permissions for DEFAULT user
1678 1678 DEFAULT_USER_PERMISSIONS = (
1679 1679 'repository.read',
1680 1680 'group.read',
1681 1681 'usergroup.read',
1682 1682 'hg.create.repository',
1683 1683 'hg.create.write_on_repogroup.true',
1684 1684 'hg.fork.repository',
1685 1685 'hg.register.manual_activate',
1686 1686 'hg.extern_activate.auto',
1687 1687 )
1688 1688
1689 1689 # defines which permissions are more important higher the more important
1690 1690 # Weight defines which permissions are more important.
1691 1691 # The higher number the more important.
1692 1692 PERM_WEIGHTS = {
1693 1693 'repository.none': 0,
1694 1694 'repository.read': 1,
1695 1695 'repository.write': 3,
1696 1696 'repository.admin': 4,
1697 1697
1698 1698 'group.none': 0,
1699 1699 'group.read': 1,
1700 1700 'group.write': 3,
1701 1701 'group.admin': 4,
1702 1702
1703 1703 'usergroup.none': 0,
1704 1704 'usergroup.read': 1,
1705 1705 'usergroup.write': 3,
1706 1706 'usergroup.admin': 4,
1707 1707
1708 1708 'hg.repogroup.create.false': 0,
1709 1709 'hg.repogroup.create.true': 1,
1710 1710
1711 1711 'hg.usergroup.create.false': 0,
1712 1712 'hg.usergroup.create.true': 1,
1713 1713
1714 1714 'hg.fork.none': 0,
1715 1715 'hg.fork.repository': 1,
1716 1716
1717 1717 'hg.create.none': 0,
1718 1718 'hg.create.repository': 1,
1719 1719
1720 1720 'hg.create.write_on_repogroup.false': 0,
1721 1721 'hg.create.write_on_repogroup.true': 1,
1722 1722
1723 1723 'hg.register.none': 0,
1724 1724 'hg.register.manual_activate': 1,
1725 1725 'hg.register.auto_activate': 2,
1726 1726
1727 1727 'hg.extern_activate.manual': 0,
1728 1728 'hg.extern_activate.auto': 1,
1729 1729 }
1730 1730
1731 1731 permission_id = Column(Integer(), primary_key=True)
1732 1732 permission_name = Column(String(255), nullable=False)
1733 1733
1734 1734 def __unicode__(self):
1735 1735 return u"<%s('%s:%s')>" % (
1736 1736 self.__class__.__name__, self.permission_id, self.permission_name
1737 1737 )
1738 1738
1739 1739 @classmethod
1740 1740 def guess_instance(cls, value):
1741 1741 return super(Permission, cls).guess_instance(value, Permission.get_by_key)
1742 1742
1743 1743 @classmethod
1744 1744 def get_by_key(cls, key):
1745 1745 return cls.query().filter(cls.permission_name == key).scalar()
1746 1746
1747 1747 @classmethod
1748 1748 def get_default_perms(cls, default_user_id):
1749 1749 q = Session().query(UserRepoToPerm, Repository, cls) \
1750 1750 .join((Repository, UserRepoToPerm.repository_id == Repository.repo_id)) \
1751 1751 .join((cls, UserRepoToPerm.permission_id == cls.permission_id)) \
1752 1752 .filter(UserRepoToPerm.user_id == default_user_id)
1753 1753
1754 1754 return q.all()
1755 1755
1756 1756 @classmethod
1757 1757 def get_default_group_perms(cls, default_user_id):
1758 1758 q = Session().query(UserRepoGroupToPerm, RepoGroup, cls) \
1759 1759 .join((RepoGroup, UserRepoGroupToPerm.group_id == RepoGroup.group_id)) \
1760 1760 .join((cls, UserRepoGroupToPerm.permission_id == cls.permission_id)) \
1761 1761 .filter(UserRepoGroupToPerm.user_id == default_user_id)
1762 1762
1763 1763 return q.all()
1764 1764
1765 1765 @classmethod
1766 1766 def get_default_user_group_perms(cls, default_user_id):
1767 1767 q = Session().query(UserUserGroupToPerm, UserGroup, cls) \
1768 1768 .join((UserGroup, UserUserGroupToPerm.user_group_id == UserGroup.users_group_id)) \
1769 1769 .join((cls, UserUserGroupToPerm.permission_id == cls.permission_id)) \
1770 1770 .filter(UserUserGroupToPerm.user_id == default_user_id)
1771 1771
1772 1772 return q.all()
1773 1773
1774 1774
1775 1775 class UserRepoToPerm(Base, BaseDbModel):
1776 1776 __tablename__ = 'repo_to_perm'
1777 1777 __table_args__ = (
1778 1778 UniqueConstraint('user_id', 'repository_id', 'permission_id'),
1779 1779 _table_args_default_dict,
1780 1780 )
1781 1781
1782 1782 repo_to_perm_id = Column(Integer(), primary_key=True)
1783 1783 user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
1784 1784 permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
1785 1785 repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
1786 1786
1787 1787 user = relationship('User')
1788 1788 repository = relationship('Repository')
1789 1789 permission = relationship('Permission')
1790 1790
1791 1791 @classmethod
1792 1792 def create(cls, user, repository, permission):
1793 1793 n = cls()
1794 1794 n.user = user
1795 1795 n.repository = repository
1796 1796 n.permission = permission
1797 1797 Session().add(n)
1798 1798 return n
1799 1799
1800 1800 def __unicode__(self):
1801 1801 return u'<%s => %s >' % (self.user, self.repository)
1802 1802
1803 1803
1804 1804 class UserUserGroupToPerm(Base, BaseDbModel):
1805 1805 __tablename__ = 'user_user_group_to_perm'
1806 1806 __table_args__ = (
1807 1807 UniqueConstraint('user_id', 'user_group_id', 'permission_id'),
1808 1808 _table_args_default_dict,
1809 1809 )
1810 1810
1811 1811 user_user_group_to_perm_id = Column(Integer(), primary_key=True)
1812 1812 user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
1813 1813 permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
1814 1814 user_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
1815 1815
1816 1816 user = relationship('User')
1817 1817 user_group = relationship('UserGroup')
1818 1818 permission = relationship('Permission')
1819 1819
1820 1820 @classmethod
1821 1821 def create(cls, user, user_group, permission):
1822 1822 n = cls()
1823 1823 n.user = user
1824 1824 n.user_group = user_group
1825 1825 n.permission = permission
1826 1826 Session().add(n)
1827 1827 return n
1828 1828
1829 1829 def __unicode__(self):
1830 1830 return u'<%s => %s >' % (self.user, self.user_group)
1831 1831
1832 1832
1833 1833 class UserToPerm(Base, BaseDbModel):
1834 1834 __tablename__ = 'user_to_perm'
1835 1835 __table_args__ = (
1836 1836 UniqueConstraint('user_id', 'permission_id'),
1837 1837 _table_args_default_dict,
1838 1838 )
1839 1839
1840 1840 user_to_perm_id = Column(Integer(), primary_key=True)
1841 1841 user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
1842 1842 permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
1843 1843
1844 1844 user = relationship('User')
1845 1845 permission = relationship('Permission')
1846 1846
1847 1847 def __unicode__(self):
1848 1848 return u'<%s => %s >' % (self.user, self.permission)
1849 1849
1850 1850
1851 1851 class UserGroupRepoToPerm(Base, BaseDbModel):
1852 1852 __tablename__ = 'users_group_repo_to_perm'
1853 1853 __table_args__ = (
1854 1854 UniqueConstraint('repository_id', 'users_group_id', 'permission_id'),
1855 1855 _table_args_default_dict,
1856 1856 )
1857 1857
1858 1858 users_group_to_perm_id = Column(Integer(), primary_key=True)
1859 1859 users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
1860 1860 permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
1861 1861 repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
1862 1862
1863 1863 users_group = relationship('UserGroup')
1864 1864 permission = relationship('Permission')
1865 1865 repository = relationship('Repository')
1866 1866
1867 1867 @classmethod
1868 1868 def create(cls, users_group, repository, permission):
1869 1869 n = cls()
1870 1870 n.users_group = users_group
1871 1871 n.repository = repository
1872 1872 n.permission = permission
1873 1873 Session().add(n)
1874 1874 return n
1875 1875
1876 1876 def __unicode__(self):
1877 1877 return u'<UserGroupRepoToPerm:%s => %s >' % (self.users_group, self.repository)
1878 1878
1879 1879
1880 1880 class UserGroupUserGroupToPerm(Base, BaseDbModel):
1881 1881 __tablename__ = 'user_group_user_group_to_perm'
1882 1882 __table_args__ = (
1883 1883 UniqueConstraint('target_user_group_id', 'user_group_id', 'permission_id'),
1884 1884 _table_args_default_dict,
1885 1885 )
1886 1886
1887 1887 user_group_user_group_to_perm_id = Column(Integer(), primary_key=True)
1888 1888 target_user_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
1889 1889 permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
1890 1890 user_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
1891 1891
1892 1892 target_user_group = relationship('UserGroup', primaryjoin='UserGroupUserGroupToPerm.target_user_group_id==UserGroup.users_group_id')
1893 1893 user_group = relationship('UserGroup', primaryjoin='UserGroupUserGroupToPerm.user_group_id==UserGroup.users_group_id')
1894 1894 permission = relationship('Permission')
1895 1895
1896 1896 @classmethod
1897 1897 def create(cls, target_user_group, user_group, permission):
1898 1898 n = cls()
1899 1899 n.target_user_group = target_user_group
1900 1900 n.user_group = user_group
1901 1901 n.permission = permission
1902 1902 Session().add(n)
1903 1903 return n
1904 1904
1905 1905 def __unicode__(self):
1906 1906 return u'<UserGroupUserGroup:%s => %s >' % (self.target_user_group, self.user_group)
1907 1907
1908 1908
1909 1909 class UserGroupToPerm(Base, BaseDbModel):
1910 1910 __tablename__ = 'users_group_to_perm'
1911 1911 __table_args__ = (
1912 1912 UniqueConstraint('users_group_id', 'permission_id',),
1913 1913 _table_args_default_dict,
1914 1914 )
1915 1915
1916 1916 users_group_to_perm_id = Column(Integer(), primary_key=True)
1917 1917 users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
1918 1918 permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
1919 1919
1920 1920 users_group = relationship('UserGroup')
1921 1921 permission = relationship('Permission')
1922 1922
1923 1923
1924 1924 class UserRepoGroupToPerm(Base, BaseDbModel):
1925 1925 __tablename__ = 'user_repo_group_to_perm'
1926 1926 __table_args__ = (
1927 1927 UniqueConstraint('user_id', 'group_id', 'permission_id'),
1928 1928 _table_args_default_dict,
1929 1929 )
1930 1930
1931 1931 group_to_perm_id = Column(Integer(), primary_key=True)
1932 1932 user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
1933 1933 group_id = Column(Integer(), ForeignKey('groups.group_id'), nullable=False)
1934 1934 permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
1935 1935
1936 1936 user = relationship('User')
1937 1937 group = relationship('RepoGroup')
1938 1938 permission = relationship('Permission')
1939 1939
1940 1940 @classmethod
1941 1941 def create(cls, user, repository_group, permission):
1942 1942 n = cls()
1943 1943 n.user = user
1944 1944 n.group = repository_group
1945 1945 n.permission = permission
1946 1946 Session().add(n)
1947 1947 return n
1948 1948
1949 1949
1950 1950 class UserGroupRepoGroupToPerm(Base, BaseDbModel):
1951 1951 __tablename__ = 'users_group_repo_group_to_perm'
1952 1952 __table_args__ = (
1953 1953 UniqueConstraint('users_group_id', 'group_id'),
1954 1954 _table_args_default_dict,
1955 1955 )
1956 1956
1957 1957 users_group_repo_group_to_perm_id = Column(Integer(), primary_key=True)
1958 1958 users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
1959 1959 group_id = Column(Integer(), ForeignKey('groups.group_id'), nullable=False)
1960 1960 permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False)
1961 1961
1962 1962 users_group = relationship('UserGroup')
1963 1963 permission = relationship('Permission')
1964 1964 group = relationship('RepoGroup')
1965 1965
1966 1966 @classmethod
1967 1967 def create(cls, user_group, repository_group, permission):
1968 1968 n = cls()
1969 1969 n.users_group = user_group
1970 1970 n.group = repository_group
1971 1971 n.permission = permission
1972 1972 Session().add(n)
1973 1973 return n
1974 1974
1975 1975
1976 1976 class Statistics(Base, BaseDbModel):
1977 1977 __tablename__ = 'statistics'
1978 1978 __table_args__ = (
1979 1979 _table_args_default_dict,
1980 1980 )
1981 1981
1982 1982 stat_id = Column(Integer(), primary_key=True)
1983 1983 repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=True)
1984 1984 stat_on_revision = Column(Integer(), nullable=False)
1985 1985 commit_activity = Column(LargeBinary(1000000), nullable=False) # JSON data
1986 1986 commit_activity_combined = Column(LargeBinary(), nullable=False) # JSON data
1987 1987 languages = Column(LargeBinary(1000000), nullable=False) # JSON data
1988 1988
1989 1989 repository = relationship('Repository', single_parent=True)
1990 1990
1991 1991
1992 1992 class UserFollowing(Base, BaseDbModel):
1993 1993 __tablename__ = 'user_followings'
1994 1994 __table_args__ = (
1995 1995 UniqueConstraint('user_id', 'follows_repository_id', name='uq_user_followings_user_repo'),
1996 1996 UniqueConstraint('user_id', 'follows_user_id', name='uq_user_followings_user_user'),
1997 1997 _table_args_default_dict,
1998 1998 )
1999 1999
2000 2000 user_following_id = Column(Integer(), primary_key=True)
2001 2001 user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
2002 2002 follows_repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
2003 2003 follows_user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=True)
2004 2004 follows_from = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
2005 2005
2006 2006 user = relationship('User', primaryjoin='User.user_id==UserFollowing.user_id')
2007 2007
2008 2008 follows_user = relationship('User', primaryjoin='User.user_id==UserFollowing.follows_user_id')
2009 2009 follows_repository = relationship('Repository', order_by=lambda: func.lower(Repository.repo_name))
2010 2010
2011 2011 @classmethod
2012 2012 def get_repo_followers(cls, repo_id):
2013 2013 return cls.query().filter(cls.follows_repository_id == repo_id)
2014 2014
2015 2015
2016 2016 class CacheInvalidation(Base, BaseDbModel):
2017 2017 __tablename__ = 'cache_invalidation'
2018 2018 __table_args__ = (
2019 2019 Index('key_idx', 'cache_key'),
2020 2020 _table_args_default_dict,
2021 2021 )
2022 2022
2023 2023 # cache_id, not used
2024 2024 cache_id = Column(Integer(), primary_key=True)
2025 2025 # cache_key as created by _get_cache_key
2026 2026 cache_key = Column(Unicode(255), nullable=False, unique=True)
2027 2027 # cache_args is a repo_name
2028 2028 cache_args = Column(Unicode(255), nullable=False)
2029 2029 # instance sets cache_active True when it is caching, other instances set
2030 2030 # cache_active to False to indicate that this cache is invalid
2031 2031 cache_active = Column(Boolean(), nullable=False, default=False)
2032 2032
2033 2033 def __init__(self, cache_key, repo_name=''):
2034 2034 self.cache_key = cache_key
2035 2035 self.cache_args = repo_name
2036 2036 self.cache_active = False
2037 2037
2038 2038 def __unicode__(self):
2039 2039 return u"<%s('%s:%s[%s]')>" % (
2040 2040 self.__class__.__name__,
2041 2041 self.cache_id, self.cache_key, self.cache_active)
2042 2042
2043 2043 def _cache_key_partition(self):
2044 2044 prefix, repo_name, suffix = self.cache_key.partition(self.cache_args)
2045 2045 return prefix, repo_name, suffix
2046 2046
2047 2047 def get_prefix(self):
2048 2048 """
2049 2049 get prefix that might have been used in _get_cache_key to
2050 2050 generate self.cache_key. Only used for informational purposes
2051 2051 in repo_edit.html.
2052 2052 """
2053 2053 # prefix, repo_name, suffix
2054 2054 return self._cache_key_partition()[0]
2055 2055
2056 2056 def get_suffix(self):
2057 2057 """
2058 2058 get suffix that might have been used in _get_cache_key to
2059 2059 generate self.cache_key. Only used for informational purposes
2060 2060 in repo_edit.html.
2061 2061 """
2062 2062 # prefix, repo_name, suffix
2063 2063 return self._cache_key_partition()[2]
2064 2064
2065 2065 @classmethod
2066 2066 def clear_cache(cls):
2067 2067 """
2068 2068 Delete all cache keys from database.
2069 2069 Should only be run when all instances are down and all entries thus stale.
2070 2070 """
2071 2071 cls.query().delete()
2072 2072 Session().commit()
2073 2073
2074 2074 @classmethod
2075 2075 def _get_cache_key(cls, key):
2076 2076 """
2077 2077 Wrapper for generating a unique cache key for this instance and "key".
2078 2078 key must / will start with a repo_name which will be stored in .cache_args .
2079 2079 """
2080 2080 prefix = kallithea.CONFIG.get('instance_id', '')
2081 2081 return "%s%s" % (prefix, key)
2082 2082
2083 2083 @classmethod
2084 2084 def set_invalidate(cls, repo_name):
2085 2085 """
2086 2086 Mark all caches of a repo as invalid in the database.
2087 2087 """
2088 2088 inv_objs = Session().query(cls).filter(cls.cache_args == repo_name).all()
2089 2089 log.debug('for repo %s got %s invalidation objects',
2090 2090 safe_str(repo_name), inv_objs)
2091 2091
2092 2092 for inv_obj in inv_objs:
2093 2093 log.debug('marking %s key for invalidation based on repo_name=%s',
2094 2094 inv_obj, safe_str(repo_name))
2095 2095 Session().delete(inv_obj)
2096 2096 Session().commit()
2097 2097
2098 2098 @classmethod
2099 2099 def test_and_set_valid(cls, repo_name, kind, valid_cache_keys=None):
2100 2100 """
2101 2101 Mark this cache key as active and currently cached.
2102 2102 Return True if the existing cache registration still was valid.
2103 2103 Return False to indicate that it had been invalidated and caches should be refreshed.
2104 2104 """
2105 2105
2106 2106 key = (repo_name + '_' + kind) if kind else repo_name
2107 2107 cache_key = cls._get_cache_key(key)
2108 2108
2109 2109 if valid_cache_keys and cache_key in valid_cache_keys:
2110 2110 return True
2111 2111
2112 2112 inv_obj = cls.query().filter(cls.cache_key == cache_key).scalar()
2113 2113 if inv_obj is None:
2114 2114 inv_obj = cls(cache_key, repo_name)
2115 2115 Session().add(inv_obj)
2116 2116 elif inv_obj.cache_active:
2117 2117 return True
2118 2118 inv_obj.cache_active = True
2119 2119 try:
2120 2120 Session().commit()
2121 2121 except sqlalchemy.exc.IntegrityError:
2122 2122 log.error('commit of CacheInvalidation failed - retrying')
2123 2123 Session().rollback()
2124 2124 inv_obj = cls.query().filter(cls.cache_key == cache_key).scalar()
2125 2125 if inv_obj is None:
2126 2126 log.error('failed to create CacheInvalidation entry')
2127 2127 # TODO: fail badly?
2128 2128 # else: TOCTOU - another thread added the key at the same time; no further action required
2129 2129 return False
2130 2130
2131 2131 @classmethod
2132 2132 def get_valid_cache_keys(cls):
2133 2133 """
2134 2134 Return opaque object with information of which caches still are valid
2135 2135 and can be used without checking for invalidation.
2136 2136 """
2137 2137 return set(inv_obj.cache_key for inv_obj in cls.query().filter(cls.cache_active).all())
2138 2138
2139 2139
2140 2140 class ChangesetComment(Base, BaseDbModel):
2141 2141 __tablename__ = 'changeset_comments'
2142 2142 __table_args__ = (
2143 2143 Index('cc_revision_idx', 'revision'),
2144 2144 Index('cc_pull_request_id_idx', 'pull_request_id'),
2145 2145 _table_args_default_dict,
2146 2146 )
2147 2147
2148 2148 comment_id = Column(Integer(), primary_key=True)
2149 2149 repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
2150 2150 revision = Column(String(40), nullable=True)
2151 2151 pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=True)
2152 2152 line_no = Column(Unicode(10), nullable=True)
2153 2153 f_path = Column(Unicode(1000), nullable=True)
2154 2154 author_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
2155 2155 text = Column(UnicodeText(), nullable=False)
2156 2156 created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
2157 2157 modified_at = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
2158 2158
2159 2159 author = relationship('User')
2160 2160 repo = relationship('Repository')
2161 2161 # status_change is frequently used directly in templates - make it a lazy
2162 2162 # join to avoid fetching each related ChangesetStatus on demand.
2163 2163 # There will only be one ChangesetStatus referencing each comment so the join will not explode.
2164 2164 status_change = relationship('ChangesetStatus',
2165 2165 cascade="all, delete-orphan", lazy='joined')
2166 2166 pull_request = relationship('PullRequest')
2167 2167
2168 2168 def url(self):
2169 2169 anchor = "comment-%s" % self.comment_id
2170 2170 import kallithea.lib.helpers as h
2171 2171 if self.revision:
2172 2172 return h.url('changeset_home', repo_name=self.repo.repo_name, revision=self.revision, anchor=anchor)
2173 2173 elif self.pull_request_id is not None:
2174 2174 return self.pull_request.url(anchor=anchor)
2175 2175
2176 2176 def __json__(self):
2177 2177 return dict(
2178 2178 comment_id=self.comment_id,
2179 2179 username=self.author.username,
2180 2180 text=self.text,
2181 2181 )
2182 2182
2183 2183 def deletable(self):
2184 2184 return self.created_on > datetime.datetime.now() - datetime.timedelta(minutes=5)
2185 2185
2186 2186
2187 2187 class ChangesetStatus(Base, BaseDbModel):
2188 2188 __tablename__ = 'changeset_statuses'
2189 2189 __table_args__ = (
2190 2190 Index('cs_revision_idx', 'revision'),
2191 2191 Index('cs_version_idx', 'version'),
2192 2192 Index('cs_pull_request_id_idx', 'pull_request_id'),
2193 2193 Index('cs_changeset_comment_id_idx', 'changeset_comment_id'),
2194 2194 Index('cs_pull_request_id_user_id_version_idx', 'pull_request_id', 'user_id', 'version'),
2195 2195 Index('cs_repo_id_pull_request_id_idx', 'repo_id', 'pull_request_id'),
2196 2196 UniqueConstraint('repo_id', 'revision', 'version'),
2197 2197 _table_args_default_dict,
2198 2198 )
2199 2199
2200 2200 STATUS_NOT_REVIEWED = DEFAULT = 'not_reviewed'
2201 2201 STATUS_APPROVED = 'approved'
2202 2202 STATUS_REJECTED = 'rejected' # is shown as "Not approved" - TODO: change database content / scheme
2203 2203 STATUS_UNDER_REVIEW = 'under_review'
2204 2204
2205 2205 STATUSES = [
2206 2206 (STATUS_NOT_REVIEWED, _("Not reviewed")), # (no icon) and default
2207 2207 (STATUS_UNDER_REVIEW, _("Under review")),
2208 2208 (STATUS_REJECTED, _("Not approved")),
2209 2209 (STATUS_APPROVED, _("Approved")),
2210 2210 ]
2211 2211 STATUSES_DICT = dict(STATUSES)
2212 2212
2213 2213 changeset_status_id = Column(Integer(), primary_key=True)
2214 2214 repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
2215 2215 user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
2216 2216 revision = Column(String(40), nullable=True)
2217 2217 status = Column(String(128), nullable=False, default=DEFAULT)
2218 2218 comment_id = Column('changeset_comment_id', Integer(), ForeignKey('changeset_comments.comment_id'), nullable=False)
2219 2219 modified_at = Column(DateTime(), nullable=False, default=datetime.datetime.now)
2220 2220 version = Column(Integer(), nullable=False, default=0)
2221 2221 pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=True)
2222 2222
2223 2223 author = relationship('User')
2224 2224 repo = relationship('Repository')
2225 2225 comment = relationship('ChangesetComment')
2226 2226 pull_request = relationship('PullRequest')
2227 2227
2228 2228 def __unicode__(self):
2229 2229 return u"<%s('%s:%s')>" % (
2230 2230 self.__class__.__name__,
2231 2231 self.status, self.author
2232 2232 )
2233 2233
2234 2234 @classmethod
2235 2235 def get_status_lbl(cls, value):
2236 2236 return cls.STATUSES_DICT.get(value)
2237 2237
2238 2238 @property
2239 2239 def status_lbl(self):
2240 2240 return ChangesetStatus.get_status_lbl(self.status)
2241 2241
2242 2242 def __json__(self):
2243 2243 return dict(
2244 2244 status=self.status,
2245 2245 modified_at=self.modified_at.replace(microsecond=0),
2246 2246 reviewer=self.author.username,
2247 2247 )
2248 2248
2249 2249
2250 2250 class PullRequest(Base, BaseDbModel):
2251 2251 __tablename__ = 'pull_requests'
2252 2252 __table_args__ = (
2253 2253 Index('pr_org_repo_id_idx', 'org_repo_id'),
2254 2254 Index('pr_other_repo_id_idx', 'other_repo_id'),
2255 2255 _table_args_default_dict,
2256 2256 )
2257 2257
2258 2258 # values for .status
2259 2259 STATUS_NEW = u'new'
2260 2260 STATUS_CLOSED = u'closed'
2261 2261
2262 2262 pull_request_id = Column(Integer(), primary_key=True)
2263 2263 title = Column(Unicode(255), nullable=False)
2264 2264 description = Column(UnicodeText(), nullable=False)
2265 2265 status = Column(Unicode(255), nullable=False, default=STATUS_NEW) # only for closedness, not approve/reject/etc
2266 2266 created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
2267 2267 updated_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
2268 2268 owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
2269 2269 _revisions = Column('revisions', UnicodeText(), nullable=False)
2270 2270 org_repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
2271 2271 org_ref = Column(Unicode(255), nullable=False)
2272 2272 other_repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
2273 2273 other_ref = Column(Unicode(255), nullable=False)
2274 2274
2275 2275 @hybrid_property
2276 2276 def revisions(self):
2277 2277 return self._revisions.split(':')
2278 2278
2279 2279 @revisions.setter
2280 2280 def revisions(self, val):
2281 2281 self._revisions = safe_unicode(':'.join(val))
2282 2282
2283 2283 @property
2284 2284 def org_ref_parts(self):
2285 2285 return self.org_ref.split(':')
2286 2286
2287 2287 @property
2288 2288 def other_ref_parts(self):
2289 2289 return self.other_ref.split(':')
2290 2290
2291 2291 owner = relationship('User')
2292 2292 reviewers = relationship('PullRequestReviewer',
2293 2293 cascade="all, delete-orphan")
2294 2294 org_repo = relationship('Repository', primaryjoin='PullRequest.org_repo_id==Repository.repo_id')
2295 2295 other_repo = relationship('Repository', primaryjoin='PullRequest.other_repo_id==Repository.repo_id')
2296 2296 statuses = relationship('ChangesetStatus', order_by='ChangesetStatus.changeset_status_id')
2297 2297 comments = relationship('ChangesetComment', order_by='ChangesetComment.comment_id',
2298 2298 cascade="all, delete-orphan")
2299 2299
2300 2300 @classmethod
2301 2301 def query(cls, reviewer_id=None, include_closed=True, sorted=False):
2302 2302 """Add PullRequest-specific helpers for common query constructs.
2303 2303
2304 2304 reviewer_id: only PRs with the specified user added as reviewer.
2305 2305
2306 2306 include_closed: if False, do not include closed PRs.
2307 2307
2308 2308 sorted: if True, apply the default ordering (newest first).
2309 2309 """
2310 2310 q = super(PullRequest, cls).query()
2311 2311
2312 2312 if reviewer_id is not None:
2313 2313 q = q.join(PullRequestReviewer).filter(PullRequestReviewer.user_id == reviewer_id)
2314 2314
2315 2315 if not include_closed:
2316 2316 q = q.filter(PullRequest.status != PullRequest.STATUS_CLOSED)
2317 2317
2318 2318 if sorted:
2319 2319 q = q.order_by(PullRequest.created_on.desc())
2320 2320
2321 2321 return q
2322 2322
2323 2323 def get_reviewer_users(self):
2324 2324 """Like .reviewers, but actually returning the users"""
2325 2325 return User.query() \
2326 2326 .join(PullRequestReviewer) \
2327 2327 .filter(PullRequestReviewer.pull_request == self) \
2328 2328 .order_by(PullRequestReviewer.pull_request_reviewers_id) \
2329 2329 .all()
2330 2330
2331 2331 def is_closed(self):
2332 2332 return self.status == self.STATUS_CLOSED
2333 2333
2334 2334 def user_review_status(self, user_id):
2335 2335 """Return the user's latest status votes on PR"""
2336 2336 # note: no filtering on repo - that would be redundant
2337 2337 status = ChangesetStatus.query() \
2338 2338 .filter(ChangesetStatus.pull_request == self) \
2339 2339 .filter(ChangesetStatus.user_id == user_id) \
2340 2340 .order_by(ChangesetStatus.version) \
2341 2341 .first()
2342 2342 return str(status.status) if status else ''
2343 2343
2344 2344 @classmethod
2345 2345 def make_nice_id(cls, pull_request_id):
2346 2346 '''Return pull request id nicely formatted for displaying'''
2347 2347 return '#%s' % pull_request_id
2348 2348
2349 2349 def nice_id(self):
2350 2350 '''Return the id of this pull request, nicely formatted for displaying'''
2351 2351 return self.make_nice_id(self.pull_request_id)
2352 2352
2353 2353 def get_api_data(self):
2354 2354 return self.__json__()
2355 2355
2356 2356 def __json__(self):
2357 2357 clone_uri_tmpl = kallithea.CONFIG.get('clone_uri_tmpl') or Repository.DEFAULT_CLONE_URI
2358 2358 return dict(
2359 2359 pull_request_id=self.pull_request_id,
2360 2360 url=self.url(),
2361 2361 reviewers=self.reviewers,
2362 2362 revisions=self.revisions,
2363 2363 owner=self.owner.username,
2364 2364 title=self.title,
2365 2365 description=self.description,
2366 2366 org_repo_url=self.org_repo.clone_url(clone_uri_tmpl=clone_uri_tmpl),
2367 2367 org_ref_parts=self.org_ref_parts,
2368 2368 other_ref_parts=self.other_ref_parts,
2369 2369 status=self.status,
2370 2370 comments=self.comments,
2371 2371 statuses=self.statuses,
2372 2372 )
2373 2373
2374 2374 def url(self, **kwargs):
2375 2375 canonical = kwargs.pop('canonical', None)
2376 2376 import kallithea.lib.helpers as h
2377 2377 b = self.org_ref_parts[1]
2378 2378 if b != self.other_ref_parts[1]:
2379 2379 s = '/_/' + b
2380 2380 else:
2381 2381 s = '/_/' + self.title
2382 2382 kwargs['extra'] = urlreadable(s)
2383 2383 if canonical:
2384 2384 return h.canonical_url('pullrequest_show', repo_name=self.other_repo.repo_name,
2385 2385 pull_request_id=self.pull_request_id, **kwargs)
2386 2386 return h.url('pullrequest_show', repo_name=self.other_repo.repo_name,
2387 2387 pull_request_id=self.pull_request_id, **kwargs)
2388 2388
2389 2389
2390 2390 class PullRequestReviewer(Base, BaseDbModel):
2391 2391 __tablename__ = 'pull_request_reviewers'
2392 2392 __table_args__ = (
2393 2393 Index('pull_request_reviewers_user_id_idx', 'user_id'),
2394 2394 _table_args_default_dict,
2395 2395 )
2396 2396
2397 2397 def __init__(self, user=None, pull_request=None):
2398 2398 self.user = user
2399 2399 self.pull_request = pull_request
2400 2400
2401 2401 pull_request_reviewers_id = Column('pull_requests_reviewers_id', Integer(), primary_key=True)
2402 2402 pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=False)
2403 2403 user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
2404 2404
2405 2405 user = relationship('User')
2406 2406 pull_request = relationship('PullRequest')
2407 2407
2408 2408 def __json__(self):
2409 2409 return dict(
2410 2410 username=self.user.username if self.user else None,
2411 2411 )
2412 2412
2413 2413
2414 2414 class Notification(object):
2415 2415 __tablename__ = 'notifications'
2416 2416
2417 2417 class UserNotification(object):
2418 2418 __tablename__ = 'user_to_notification'
2419 2419
2420 2420
2421 2421 class Gist(Base, BaseDbModel):
2422 2422 __tablename__ = 'gists'
2423 2423 __table_args__ = (
2424 2424 Index('g_gist_access_id_idx', 'gist_access_id'),
2425 2425 Index('g_created_on_idx', 'created_on'),
2426 2426 _table_args_default_dict,
2427 2427 )
2428 2428
2429 2429 GIST_PUBLIC = u'public'
2430 2430 GIST_PRIVATE = u'private'
2431 2431 DEFAULT_FILENAME = u'gistfile1.txt'
2432 2432
2433 2433 gist_id = Column(Integer(), primary_key=True)
2434 2434 gist_access_id = Column(Unicode(250), nullable=False)
2435 2435 gist_description = Column(UnicodeText(), nullable=False)
2436 2436 owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
2437 2437 gist_expires = Column(Float(53), nullable=False)
2438 2438 gist_type = Column(Unicode(128), nullable=False)
2439 2439 created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
2440 2440 modified_at = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
2441 2441
2442 2442 owner = relationship('User')
2443 2443
2444 2444 @hybrid_property
2445 2445 def is_expired(self):
2446 2446 return (self.gist_expires != -1) & (time.time() > self.gist_expires)
2447 2447
2448 2448 def __repr__(self):
2449 2449 return '<Gist:[%s]%s>' % (self.gist_type, self.gist_access_id)
2450 2450
2451 2451 @classmethod
2452 2452 def guess_instance(cls, value):
2453 2453 return super(Gist, cls).guess_instance(value, Gist.get_by_access_id)
2454 2454
2455 2455 @classmethod
2456 2456 def get_or_404(cls, id_):
2457 2457 res = cls.query().filter(cls.gist_access_id == id_).scalar()
2458 2458 if res is None:
2459 2459 raise HTTPNotFound
2460 2460 return res
2461 2461
2462 2462 @classmethod
2463 2463 def get_by_access_id(cls, gist_access_id):
2464 2464 return cls.query().filter(cls.gist_access_id == gist_access_id).scalar()
2465 2465
2466 2466 def gist_url(self):
2467 2467 alias_url = kallithea.CONFIG.get('gist_alias_url')
2468 2468 if alias_url:
2469 2469 return alias_url.replace('{gistid}', self.gist_access_id)
2470 2470
2471 2471 import kallithea.lib.helpers as h
2472 2472 return h.canonical_url('gist', gist_id=self.gist_access_id)
2473 2473
2474 2474 @classmethod
2475 2475 def base_path(cls):
2476 2476 """
2477 2477 Returns base path where all gists are stored
2478 2478
2479 2479 :param cls:
2480 2480 """
2481 2481 from kallithea.model.gist import GIST_STORE_LOC
2482 2482 q = Session().query(Ui) \
2483 2483 .filter(Ui.ui_key == URL_SEP)
2484 2484 q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
2485 2485 return os.path.join(q.one().ui_value, GIST_STORE_LOC)
2486 2486
2487 2487 def get_api_data(self):
2488 2488 """
2489 2489 Common function for generating gist related data for API
2490 2490 """
2491 2491 gist = self
2492 2492 data = dict(
2493 2493 gist_id=gist.gist_id,
2494 2494 type=gist.gist_type,
2495 2495 access_id=gist.gist_access_id,
2496 2496 description=gist.gist_description,
2497 2497 url=gist.gist_url(),
2498 2498 expires=gist.gist_expires,
2499 2499 created_on=gist.created_on,
2500 2500 )
2501 2501 return data
2502 2502
2503 2503 def __json__(self):
2504 2504 data = dict(
2505 2505 )
2506 2506 data.update(self.get_api_data())
2507 2507 return data
2508 2508 ## SCM functions
2509 2509
2510 2510 @property
2511 2511 def scm_instance(self):
2512 2512 from kallithea.lib.vcs import get_repo
2513 2513 base_path = self.base_path()
2514 2514 return get_repo(os.path.join(*map(safe_str,
2515 2515 [base_path, self.gist_access_id])))
2516 2516
2517 2517
2518 2518 class UserSshKeys(Base, BaseDbModel):
2519 2519 __tablename__ = 'user_ssh_keys'
2520 2520 __table_args__ = (
2521 2521 Index('usk_public_key_idx', 'public_key'),
2522 2522 Index('usk_fingerprint_idx', 'fingerprint'),
2523 2523 UniqueConstraint('fingerprint'),
2524 2524 _table_args_default_dict
2525 2525 )
2526 2526 __mapper_args__ = {}
2527 2527
2528 2528 user_ssh_key_id = Column(Integer(), primary_key=True)
2529 2529 user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
2530 2530 _public_key = Column('public_key', UnicodeText(), nullable=False)
2531 2531 description = Column(UnicodeText(), nullable=False)
2532 2532 fingerprint = Column(String(255), nullable=False, unique=True)
2533 2533 created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
2534 2534 last_seen = Column(DateTime(timezone=False), nullable=True)
2535 2535
2536 2536 user = relationship('User')
2537 2537
2538 2538 @property
2539 2539 def public_key(self):
2540 2540 return self._public_key
2541 2541
2542 2542 @public_key.setter
2543 2543 def public_key(self, full_key):
2544 2544 # the full public key is too long to be suitable as database key - instead,
2545 2545 # use fingerprints similar to 'ssh-keygen -E sha256 -lf ~/.ssh/id_rsa.pub'
2546 2546 self._public_key = full_key
2547 2547 enc_key = full_key.split(" ")[1]
2548 2548 self.fingerprint = hashlib.sha256(enc_key.decode('base64')).digest().encode('base64').replace('\n', '').rstrip('=')
@@ -1,868 +1,868 b''
1 1 import datetime
2 2 import os
3 3 import sys
4 4 import urllib2
5 5
6 6 import mock
7 7 import pytest
8 8
9 9 from kallithea.lib.vcs.backends.git import GitChangeset, GitRepository
10 10 from kallithea.lib.vcs.exceptions import NodeDoesNotExistError, RepositoryError, VCSError
11 11 from kallithea.lib.vcs.nodes import DirNode, FileNode, NodeKind, NodeState
12 12 from kallithea.model.scm import ScmModel
13 13 from kallithea.tests.vcs.base import _BackendTestMixin
14 14 from kallithea.tests.vcs.conf import TEST_GIT_REPO, TEST_GIT_REPO_CLONE, TESTS_TMP_PATH, get_new_dir
15 15
16 16
17 17 class TestGitRepository(object):
18 18
19 19 def __check_for_existing_repo(self):
20 20 if os.path.exists(TEST_GIT_REPO_CLONE):
21 21 pytest.fail('Cannot test git clone repo as location %s already '
22 22 'exists. You should manually remove it first.'
23 23 % TEST_GIT_REPO_CLONE)
24 24
25 25 def setup_method(self):
26 26 self.repo = GitRepository(TEST_GIT_REPO)
27 27
28 28 def test_wrong_repo_path(self):
29 29 wrong_repo_path = os.path.join(TESTS_TMP_PATH, 'errorrepo')
30 30 with pytest.raises(RepositoryError):
31 31 GitRepository(wrong_repo_path)
32 32
33 33 def test_git_cmd_injection(self):
34 34 repo_inject_path = TEST_GIT_REPO + '; echo "Cake";'
35 35 with pytest.raises(urllib2.URLError):
36 36 # Should fail because URL will contain the parts after ; too
37 37 GitRepository(get_new_dir('injection-repo'), src_url=repo_inject_path, update_after_clone=True, create=True)
38 38
39 39 with pytest.raises(RepositoryError):
40 40 # Should fail on direct clone call, which as of this writing does not happen outside of class
41 41 clone_fail_repo = GitRepository(get_new_dir('injection-repo'), create=True)
42 42 clone_fail_repo.clone(repo_inject_path, update_after_clone=True,)
43 43
44 44 # Verify correct quoting of evil characters that should work on posix file systems
45 45 if sys.platform == 'win32':
46 46 # windows does not allow '"' in dir names
47 47 # and some versions of the git client don't like ` and '
48 48 tricky_path = get_new_dir("tricky-path-repo-$")
49 49 else:
50 50 tricky_path = get_new_dir("tricky-path-repo-$'\"`")
51 51 successfully_cloned = GitRepository(tricky_path, src_url=TEST_GIT_REPO, update_after_clone=True, create=True)
52 52 # Repo should have been created
53 53 assert not successfully_cloned._repo.bare
54 54
55 55 if sys.platform == 'win32':
56 56 # windows does not allow '"' in dir names
57 57 # and some versions of the git client don't like ` and '
58 58 tricky_path_2 = get_new_dir("tricky-path-2-repo-$")
59 59 else:
60 60 tricky_path_2 = get_new_dir("tricky-path-2-repo-$'\"`")
61 61 successfully_cloned2 = GitRepository(tricky_path_2, src_url=tricky_path, bare=True, create=True)
62 62 # Repo should have been created and thus used correct quoting for clone
63 63 assert successfully_cloned2._repo.bare
64 64
65 65 # Should pass because URL has been properly quoted
66 66 successfully_cloned.pull(tricky_path_2)
67 67 successfully_cloned2.fetch(tricky_path)
68 68
69 69 def test_repo_create_with_spaces_in_path(self):
70 70 repo_path = get_new_dir("path with spaces")
71 71 repo = GitRepository(repo_path, src_url=None, bare=True, create=True)
72 72 # Repo should have been created
73 73 assert repo._repo.bare
74 74
75 75 def test_repo_clone(self):
76 76 self.__check_for_existing_repo()
77 77 repo = GitRepository(TEST_GIT_REPO)
78 78 repo_clone = GitRepository(TEST_GIT_REPO_CLONE,
79 79 src_url=TEST_GIT_REPO, create=True, update_after_clone=True)
80 80 assert len(repo.revisions) == len(repo_clone.revisions)
81 81 # Checking hashes of changesets should be enough
82 82 for changeset in repo.get_changesets():
83 83 raw_id = changeset.raw_id
84 84 assert raw_id == repo_clone.get_changeset(raw_id).raw_id
85 85
86 86 def test_repo_clone_with_spaces_in_path(self):
87 87 repo_path = get_new_dir("path with spaces")
88 88 successfully_cloned = GitRepository(repo_path, src_url=TEST_GIT_REPO, update_after_clone=True, create=True)
89 89 # Repo should have been created
90 90 assert not successfully_cloned._repo.bare
91 91
92 92 successfully_cloned.pull(TEST_GIT_REPO)
93 93 self.repo.fetch(repo_path)
94 94
95 95 def test_repo_clone_without_create(self):
96 96 with pytest.raises(RepositoryError):
97 97 GitRepository(TEST_GIT_REPO_CLONE + '_wo_create', src_url=TEST_GIT_REPO)
98 98
99 99 def test_repo_clone_with_update(self):
100 100 repo = GitRepository(TEST_GIT_REPO)
101 101 clone_path = TEST_GIT_REPO_CLONE + '_with_update'
102 102 repo_clone = GitRepository(clone_path,
103 103 create=True, src_url=TEST_GIT_REPO, update_after_clone=True)
104 104 assert len(repo.revisions) == len(repo_clone.revisions)
105 105
106 106 # check if current workdir was updated
107 107 fpath = os.path.join(clone_path, 'MANIFEST.in')
108 108 assert os.path.isfile(fpath) == True, 'Repo was cloned and updated but file %s could not be found' % fpath
109 109
110 110 def test_repo_clone_without_update(self):
111 111 repo = GitRepository(TEST_GIT_REPO)
112 112 clone_path = TEST_GIT_REPO_CLONE + '_without_update'
113 113 repo_clone = GitRepository(clone_path,
114 114 create=True, src_url=TEST_GIT_REPO, update_after_clone=False)
115 115 assert len(repo.revisions) == len(repo_clone.revisions)
116 116 # check if current workdir was *NOT* updated
117 117 fpath = os.path.join(clone_path, 'MANIFEST.in')
118 118 # Make sure it's not bare repo
119 119 assert not repo_clone._repo.bare
120 120 assert os.path.isfile(fpath) == False, 'Repo was cloned and updated but file %s was found' % fpath
121 121
122 122 def test_repo_clone_into_bare_repo(self):
123 123 repo = GitRepository(TEST_GIT_REPO)
124 124 clone_path = TEST_GIT_REPO_CLONE + '_bare.git'
125 125 repo_clone = GitRepository(clone_path, create=True,
126 126 src_url=repo.path, bare=True)
127 127 assert repo_clone._repo.bare
128 128
129 129 def test_create_repo_is_not_bare_by_default(self):
130 130 repo = GitRepository(get_new_dir('not-bare-by-default'), create=True)
131 131 assert not repo._repo.bare
132 132
133 133 def test_create_bare_repo(self):
134 134 repo = GitRepository(get_new_dir('bare-repo'), create=True, bare=True)
135 135 assert repo._repo.bare
136 136
137 137 def test_revisions(self):
138 138 # there are 112 revisions (by now)
139 139 # so we can assume they would be available from now on
140 140 subset = set([
141 141 'c1214f7e79e02fc37156ff215cd71275450cffc3',
142 142 '38b5fe81f109cb111f549bfe9bb6b267e10bc557',
143 143 'fa6600f6848800641328adbf7811fd2372c02ab2',
144 144 '102607b09cdd60e2793929c4f90478be29f85a17',
145 145 '49d3fd156b6f7db46313fac355dca1a0b94a0017',
146 146 '2d1028c054665b962fa3d307adfc923ddd528038',
147 147 'd7e0d30fbcae12c90680eb095a4f5f02505ce501',
148 148 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
149 149 'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
150 150 '8430a588b43b5d6da365400117c89400326e7992',
151 151 'd955cd312c17b02143c04fa1099a352b04368118',
152 152 'f67b87e5c629c2ee0ba58f85197e423ff28d735b',
153 153 'add63e382e4aabc9e1afdc4bdc24506c269b7618',
154 154 'f298fe1189f1b69779a4423f40b48edf92a703fc',
155 155 'bd9b619eb41994cac43d67cf4ccc8399c1125808',
156 156 '6e125e7c890379446e98980d8ed60fba87d0f6d1',
157 157 'd4a54db9f745dfeba6933bf5b1e79e15d0af20bd',
158 158 '0b05e4ed56c802098dfc813cbe779b2f49e92500',
159 159 '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
160 160 '45223f8f114c64bf4d6f853e3c35a369a6305520',
161 161 'ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
162 162 'f5ea29fc42ef67a2a5a7aecff10e1566699acd68',
163 163 '27d48942240f5b91dfda77accd2caac94708cc7d',
164 164 '622f0eb0bafd619d2560c26f80f09e3b0b0d78af',
165 165 'e686b958768ee96af8029fe19c6050b1a8dd3b2b'])
166 166 assert subset.issubset(set(self.repo.revisions))
167 167
168 168 def test_slicing(self):
169 169 # 4 1 5 10 95
170 170 for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
171 171 (10, 20, 10), (5, 100, 95)]:
172 172 revs = list(self.repo[sfrom:sto])
173 173 assert len(revs) == size
174 174 assert revs[0] == self.repo.get_changeset(sfrom)
175 175 assert revs[-1] == self.repo.get_changeset(sto - 1)
176 176
177 177 def test_branches(self):
178 178 # TODO: Need more tests here
179 179 # Removed (those are 'remotes' branches for cloned repo)
180 180 #assert 'master' in self.repo.branches
181 181 #assert 'gittree' in self.repo.branches
182 182 #assert 'web-branch' in self.repo.branches
183 183 for name, id in self.repo.branches.items():
184 184 assert isinstance(self.repo.get_changeset(id), GitChangeset)
185 185
186 186 def test_tags(self):
187 187 # TODO: Need more tests here
188 188 assert 'v0.1.1' in self.repo.tags
189 189 assert 'v0.1.2' in self.repo.tags
190 190 for name, id in self.repo.tags.items():
191 191 assert isinstance(self.repo.get_changeset(id), GitChangeset)
192 192
193 193 def _test_single_changeset_cache(self, revision):
194 194 chset = self.repo.get_changeset(revision)
195 195 assert revision in self.repo.changesets
196 196 assert chset is self.repo.changesets[revision]
197 197
198 198 def test_initial_changeset(self):
199 199 id = self.repo.revisions[0]
200 200 init_chset = self.repo.get_changeset(id)
201 201 assert init_chset.message == 'initial import\n'
202 202 assert init_chset.author == 'Marcin Kuzminski <marcin@python-blog.com>'
203 203 for path in ('vcs/__init__.py',
204 204 'vcs/backends/BaseRepository.py',
205 205 'vcs/backends/__init__.py'):
206 206 assert isinstance(init_chset.get_node(path), FileNode)
207 207 for path in ('', 'vcs', 'vcs/backends'):
208 208 assert isinstance(init_chset.get_node(path), DirNode)
209 209
210 210 with pytest.raises(NodeDoesNotExistError):
211 211 init_chset.get_node(path='foobar')
212 212
213 213 node = init_chset.get_node('vcs/')
214 214 assert hasattr(node, 'kind')
215 215 assert node.kind == NodeKind.DIR
216 216
217 217 node = init_chset.get_node('vcs')
218 218 assert hasattr(node, 'kind')
219 219 assert node.kind == NodeKind.DIR
220 220
221 221 node = init_chset.get_node('vcs/__init__.py')
222 222 assert hasattr(node, 'kind')
223 223 assert node.kind == NodeKind.FILE
224 224
225 225 def test_not_existing_changeset(self):
226 226 with pytest.raises(RepositoryError):
227 227 self.repo.get_changeset('f' * 40)
228 228
229 229 def test_changeset10(self):
230 230
231 231 chset10 = self.repo.get_changeset(self.repo.revisions[9])
232 232 readme = """===
233 233 VCS
234 234 ===
235 235
236 236 Various Version Control System management abstraction layer for Python.
237 237
238 238 Introduction
239 239 ------------
240 240
241 241 TODO: To be written...
242 242
243 243 """
244 244 node = chset10.get_node('README.rst')
245 245 assert node.kind == NodeKind.FILE
246 246 assert node.content == readme
247 247
248 248
249 249 class TestGitChangeset(object):
250 250
251 251 def setup_method(self):
252 252 self.repo = GitRepository(TEST_GIT_REPO)
253 253
254 254 def test_default_changeset(self):
255 255 tip = self.repo.get_changeset()
256 256 assert tip == self.repo.get_changeset(None)
257 257 assert tip == self.repo.get_changeset('tip')
258 258
259 259 def test_root_node(self):
260 260 tip = self.repo.get_changeset()
261 261 assert tip.root is tip.get_node('')
262 262
263 263 def test_lazy_fetch(self):
264 264 """
265 265 Test if changeset's nodes expands and are cached as we walk through
266 266 the revision. This test is somewhat hard to write as order of tests
267 267 is a key here. Written by running command after command in a shell.
268 268 """
269 269 commit_id = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
270 270 assert commit_id in self.repo.revisions
271 271 chset = self.repo.get_changeset(commit_id)
272 272 assert len(chset.nodes) == 0
273 273 root = chset.root
274 274 assert len(chset.nodes) == 1
275 275 assert len(root.nodes) == 8
276 276 # accessing root.nodes updates chset.nodes
277 277 assert len(chset.nodes) == 9
278 278
279 279 docs = root.get_node('docs')
280 280 # we haven't yet accessed anything new as docs dir was already cached
281 281 assert len(chset.nodes) == 9
282 282 assert len(docs.nodes) == 8
283 283 # accessing docs.nodes updates chset.nodes
284 284 assert len(chset.nodes) == 17
285 285
286 286 assert docs is chset.get_node('docs')
287 287 assert docs is root.nodes[0]
288 288 assert docs is root.dirs[0]
289 289 assert docs is chset.get_node('docs')
290 290
291 291 def test_nodes_with_changeset(self):
292 292 commit_id = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
293 293 chset = self.repo.get_changeset(commit_id)
294 294 root = chset.root
295 295 docs = root.get_node('docs')
296 296 assert docs is chset.get_node('docs')
297 297 api = docs.get_node('api')
298 298 assert api is chset.get_node('docs/api')
299 299 index = api.get_node('index.rst')
300 300 assert index is chset.get_node('docs/api/index.rst')
301 301 assert index is chset.get_node('docs') \
302 302 .get_node('api') \
303 303 .get_node('index.rst')
304 304
305 305 def test_branch_and_tags(self):
306 306 # Those tests seem to show wrong results:
307 307 # in Git, only heads have a branch - most changesets don't
308 308 rev0 = self.repo.revisions[0]
309 309 chset0 = self.repo.get_changeset(rev0)
310 310 assert chset0.branch is None # should be 'master'?
311 311 assert chset0.branches == [] # should be 'master'?
312 312 assert chset0.tags == []
313 313
314 314 rev10 = self.repo.revisions[10]
315 315 chset10 = self.repo.get_changeset(rev10)
316 316 assert chset10.branch is None # should be 'master'?
317 317 assert chset10.branches == [] # should be 'master'?
318 318 assert chset10.tags == []
319 319
320 320 rev44 = self.repo.revisions[44]
321 321 chset44 = self.repo.get_changeset(rev44)
322 322 assert chset44.branch is None # should be 'web-branch'?
323 323 assert chset44.branches == [] # should be 'web-branch'?
324 324
325 325 tip = self.repo.get_changeset('tip')
326 326 assert 'tip' not in tip.tags # it should be?
327 327 assert not tip.tags # how it is!
328 328
329 329 def _test_slices(self, limit, offset):
330 330 count = self.repo.count()
331 331 changesets = self.repo.get_changesets(limit=limit, offset=offset)
332 332 idx = 0
333 333 for changeset in changesets:
334 334 rev = offset + idx
335 335 idx += 1
336 336 rev_id = self.repo.revisions[rev]
337 337 if idx > limit:
338 338 pytest.fail("Exceeded limit already (getting revision %s, "
339 339 "there are %s total revisions, offset=%s, limit=%s)"
340 340 % (rev_id, count, offset, limit))
341 341 assert changeset == self.repo.get_changeset(rev_id)
342 342 result = list(self.repo.get_changesets(limit=limit, offset=offset))
343 343 start = offset
344 344 end = limit and offset + limit or None
345 345 sliced = list(self.repo[start:end])
346 346 pytest.failUnlessEqual(result, sliced,
347 347 msg="Comparison failed for limit=%s, offset=%s"
348 348 "(get_changeset returned: %s and sliced: %s"
349 349 % (limit, offset, result, sliced))
350 350
351 351 def _test_file_size(self, revision, path, size):
352 352 node = self.repo.get_changeset(revision).get_node(path)
353 353 assert node.is_file()
354 354 assert node.size == size
355 355
356 356 def test_file_size(self):
357 357 to_check = (
358 358 ('c1214f7e79e02fc37156ff215cd71275450cffc3',
359 359 'vcs/backends/BaseRepository.py', 502),
360 360 ('d7e0d30fbcae12c90680eb095a4f5f02505ce501',
361 361 'vcs/backends/hg.py', 854),
362 362 ('6e125e7c890379446e98980d8ed60fba87d0f6d1',
363 363 'setup.py', 1068),
364 364 ('d955cd312c17b02143c04fa1099a352b04368118',
365 365 'vcs/backends/base.py', 2921),
366 366 ('ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
367 367 'vcs/backends/base.py', 3936),
368 368 ('f50f42baeed5af6518ef4b0cb2f1423f3851a941',
369 369 'vcs/backends/base.py', 6189),
370 370 )
371 371 for revision, path, size in to_check:
372 372 self._test_file_size(revision, path, size)
373 373
374 374 def _test_dir_size(self, revision, path, size):
375 375 node = self.repo.get_changeset(revision).get_node(path)
376 376 assert node.size == size
377 377
378 378 def test_dir_size(self):
379 379 to_check = (
380 380 ('5f2c6ee195929b0be80749243c18121c9864a3b3', '/', 674076),
381 381 ('7ab37bc680b4aa72c34d07b230c866c28e9fc204', '/', 674049),
382 382 ('6892503fb8f2a552cef5f4d4cc2cdbd13ae1cd2f', '/', 671830),
383 383 )
384 384 for revision, path, size in to_check:
385 385 self._test_dir_size(revision, path, size)
386 386
387 387 def test_repo_size(self):
388 388 assert self.repo.size == 674076
389 389
390 390 def test_file_history(self):
391 391 # we can only check if those revisions are present in the history
392 392 # as we cannot update this test every time file is changed
393 393 files = {
394 394 'setup.py': [
395 395 '54386793436c938cff89326944d4c2702340037d',
396 396 '51d254f0ecf5df2ce50c0b115741f4cf13985dab',
397 397 '998ed409c795fec2012b1c0ca054d99888b22090',
398 398 '5e0eb4c47f56564395f76333f319d26c79e2fb09',
399 399 '0115510b70c7229dbc5dc49036b32e7d91d23acd',
400 400 '7cb3fd1b6d8c20ba89e2264f1c8baebc8a52d36e',
401 401 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
402 402 '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
403 403 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
404 404 ],
405 405 'vcs/nodes.py': [
406 406 '33fa3223355104431402a888fa77a4e9956feb3e',
407 407 'fa014c12c26d10ba682fadb78f2a11c24c8118e1',
408 408 'e686b958768ee96af8029fe19c6050b1a8dd3b2b',
409 409 'ab5721ca0a081f26bf43d9051e615af2cc99952f',
410 410 'c877b68d18e792a66b7f4c529ea02c8f80801542',
411 411 '4313566d2e417cb382948f8d9d7c765330356054',
412 412 '6c2303a793671e807d1cfc70134c9ca0767d98c2',
413 413 '54386793436c938cff89326944d4c2702340037d',
414 414 '54000345d2e78b03a99d561399e8e548de3f3203',
415 415 '1c6b3677b37ea064cb4b51714d8f7498f93f4b2b',
416 416 '2d03ca750a44440fb5ea8b751176d1f36f8e8f46',
417 417 '2a08b128c206db48c2f0b8f70df060e6db0ae4f8',
418 418 '30c26513ff1eb8e5ce0e1c6b477ee5dc50e2f34b',
419 419 'ac71e9503c2ca95542839af0ce7b64011b72ea7c',
420 420 '12669288fd13adba2a9b7dd5b870cc23ffab92d2',
421 421 '5a0c84f3e6fe3473e4c8427199d5a6fc71a9b382',
422 422 '12f2f5e2b38e6ff3fbdb5d722efed9aa72ecb0d5',
423 423 '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
424 424 'f50f42baeed5af6518ef4b0cb2f1423f3851a941',
425 425 'd7e390a45f6aa96f04f5e7f583ad4f867431aa25',
426 426 'f15c21f97864b4f071cddfbf2750ec2e23859414',
427 427 'e906ef056cf539a4e4e5fc8003eaf7cf14dd8ade',
428 428 'ea2b108b48aa8f8c9c4a941f66c1a03315ca1c3b',
429 429 '84dec09632a4458f79f50ddbbd155506c460b4f9',
430 430 '0115510b70c7229dbc5dc49036b32e7d91d23acd',
431 431 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
432 432 '3bf1c5868e570e39569d094f922d33ced2fa3b2b',
433 433 'b8d04012574729d2c29886e53b1a43ef16dd00a1',
434 434 '6970b057cffe4aab0a792aa634c89f4bebf01441',
435 435 'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
436 436 'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
437 437 ],
438 438 'vcs/backends/git.py': [
439 439 '4cf116ad5a457530381135e2f4c453e68a1b0105',
440 440 '9a751d84d8e9408e736329767387f41b36935153',
441 441 'cb681fb539c3faaedbcdf5ca71ca413425c18f01',
442 442 '428f81bb652bcba8d631bce926e8834ff49bdcc6',
443 443 '180ab15aebf26f98f714d8c68715e0f05fa6e1c7',
444 444 '2b8e07312a2e89e92b90426ab97f349f4bce2a3a',
445 445 '50e08c506174d8645a4bb517dd122ac946a0f3bf',
446 446 '54000345d2e78b03a99d561399e8e548de3f3203',
447 447 ],
448 448 }
449 449 for path, revs in files.items():
450 450 node = self.repo.get_changeset(revs[0]).get_node(path)
451 451 node_revs = [chset.raw_id for chset in node.history]
452 452 assert set(revs).issubset(set(node_revs)), "We assumed that %s is subset of revisions for which file %s " \
453 453 "has been changed, and history of that node returned: %s" \
454 454 % (revs, path, node_revs)
455 455
456 456 def test_file_annotate(self):
457 457 files = {
458 458 'vcs/backends/__init__.py': {
459 459 'c1214f7e79e02fc37156ff215cd71275450cffc3': {
460 460 'lines_no': 1,
461 461 'changesets': [
462 462 'c1214f7e79e02fc37156ff215cd71275450cffc3',
463 463 ],
464 464 },
465 465 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647': {
466 466 'lines_no': 21,
467 467 'changesets': [
468 468 '49d3fd156b6f7db46313fac355dca1a0b94a0017',
469 469 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
470 470 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
471 471 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
472 472 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
473 473 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
474 474 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
475 475 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
476 476 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
477 477 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
478 478 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
479 479 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
480 480 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
481 481 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
482 482 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
483 483 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
484 484 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
485 485 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
486 486 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
487 487 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
488 488 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
489 489 ],
490 490 },
491 491 'e29b67bd158580fc90fc5e9111240b90e6e86064': {
492 492 'lines_no': 32,
493 493 'changesets': [
494 494 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
495 495 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
496 496 '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
497 497 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
498 498 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
499 499 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
500 500 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
501 501 '54000345d2e78b03a99d561399e8e548de3f3203',
502 502 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
503 503 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
504 504 '78c3f0c23b7ee935ec276acb8b8212444c33c396',
505 505 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
506 506 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
507 507 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
508 508 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
509 509 '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
510 510 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
511 511 '78c3f0c23b7ee935ec276acb8b8212444c33c396',
512 512 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
513 513 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
514 514 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
515 515 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
516 516 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
517 517 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
518 518 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
519 519 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
520 520 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
521 521 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
522 522 '992f38217b979d0b0987d0bae3cc26dac85d9b19',
523 523 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
524 524 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
525 525 '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
526 526 ],
527 527 },
528 528 },
529 529 }
530 530
531 531 for fname, revision_dict in files.items():
532 532 for rev, data in revision_dict.items():
533 533 cs = self.repo.get_changeset(rev)
534 534
535 535 l1_1 = [x[1] for x in cs.get_file_annotate(fname)]
536 536 l1_2 = [x[2]().raw_id for x in cs.get_file_annotate(fname)]
537 537 assert l1_1 == l1_2
538 538 l1 = l1_1
539 539 l2 = files[fname][rev]['changesets']
540 540 assert l1 == l2, "The lists of revision for %s@rev %s" \
541 541 "from annotation list should match each other, " \
542 542 "got \n%s \nvs \n%s " % (fname, rev, l1, l2)
543 543
544 544 def test_files_state(self):
545 545 """
546 546 Tests state of FileNodes.
547 547 """
548 548 node = self.repo \
549 549 .get_changeset('e6ea6d16e2f26250124a1f4b4fe37a912f9d86a0') \
550 550 .get_node('vcs/utils/diffs.py')
551 551 assert node.state, NodeState.ADDED
552 552 assert node.added
553 553 assert not node.changed
554 554 assert not node.not_changed
555 555 assert not node.removed
556 556
557 557 node = self.repo \
558 558 .get_changeset('33fa3223355104431402a888fa77a4e9956feb3e') \
559 559 .get_node('.hgignore')
560 560 assert node.state, NodeState.CHANGED
561 561 assert not node.added
562 562 assert node.changed
563 563 assert not node.not_changed
564 564 assert not node.removed
565 565
566 566 node = self.repo \
567 567 .get_changeset('e29b67bd158580fc90fc5e9111240b90e6e86064') \
568 568 .get_node('setup.py')
569 569 assert node.state, NodeState.NOT_CHANGED
570 570 assert not node.added
571 571 assert not node.changed
572 572 assert node.not_changed
573 573 assert not node.removed
574 574
575 575 # If node has REMOVED state then trying to fetch it would raise
576 576 # ChangesetError exception
577 577 chset = self.repo.get_changeset(
578 578 'fa6600f6848800641328adbf7811fd2372c02ab2')
579 579 path = 'vcs/backends/BaseRepository.py'
580 580 with pytest.raises(NodeDoesNotExistError):
581 581 chset.get_node(path)
582 582 # but it would be one of ``removed`` (changeset's attribute)
583 583 assert path in [rf.path for rf in chset.removed]
584 584
585 585 chset = self.repo.get_changeset(
586 586 '54386793436c938cff89326944d4c2702340037d')
587 587 changed = ['setup.py', 'tests/test_nodes.py', 'vcs/backends/hg.py',
588 588 'vcs/nodes.py']
589 589 assert set(changed) == set([f.path for f in chset.changed])
590 590
591 591 def test_commit_message_is_unicode(self):
592 592 for cs in self.repo:
593 assert type(cs.message) == unicode
593 assert isinstance(cs.message, unicode)
594 594
595 595 def test_changeset_author_is_unicode(self):
596 596 for cs in self.repo:
597 assert type(cs.author) == unicode
597 assert isinstance(cs.author, unicode)
598 598
599 599 def test_repo_files_content_is_unicode(self):
600 600 changeset = self.repo.get_changeset()
601 601 for node in changeset.get_node('/'):
602 602 if node.is_file():
603 assert type(node.content) == unicode
603 assert isinstance(node.content, unicode)
604 604
605 605 def test_wrong_path(self):
606 606 # There is 'setup.py' in the root dir but not there:
607 607 path = 'foo/bar/setup.py'
608 608 tip = self.repo.get_changeset()
609 609 with pytest.raises(VCSError):
610 610 tip.get_node(path)
611 611
612 612 def test_author_email(self):
613 613 assert 'marcin@python-blog.com' == self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3').author_email
614 614 assert 'lukasz.balcerzak@python-center.pl' == self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b').author_email
615 615 assert '' == self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992').author_email
616 616
617 617 def test_author_username(self):
618 618 assert 'Marcin Kuzminski' == self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3').author_name
619 619 assert 'Lukasz Balcerzak' == self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b').author_name
620 620 assert 'marcink none@none' == self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992').author_name
621 621
622 622
623 623 class TestGitSpecific():
624 624
625 625 def test_error_is_raised_for_added_if_diff_name_status_is_wrong(self):
626 626 repo = mock.MagicMock()
627 627 changeset = GitChangeset(repo, 'foobar')
628 628 changeset._diff_name_status = 'foobar'
629 629 with pytest.raises(VCSError):
630 630 changeset.added
631 631
632 632 def test_error_is_raised_for_changed_if_diff_name_status_is_wrong(self):
633 633 repo = mock.MagicMock()
634 634 changeset = GitChangeset(repo, 'foobar')
635 635 changeset._diff_name_status = 'foobar'
636 636 with pytest.raises(VCSError):
637 637 changeset.added
638 638
639 639 def test_error_is_raised_for_removed_if_diff_name_status_is_wrong(self):
640 640 repo = mock.MagicMock()
641 641 changeset = GitChangeset(repo, 'foobar')
642 642 changeset._diff_name_status = 'foobar'
643 643 with pytest.raises(VCSError):
644 644 changeset.added
645 645
646 646
647 647 class TestGitSpecificWithRepo(_BackendTestMixin):
648 648 backend_alias = 'git'
649 649
650 650 @classmethod
651 651 def _get_commits(cls):
652 652 return [
653 653 {
654 654 'message': 'Initial',
655 655 'author': 'Joe Doe <joe.doe@example.com>',
656 656 'date': datetime.datetime(2010, 1, 1, 20),
657 657 'added': [
658 658 FileNode('foobar/static/js/admin/base.js', content='base'),
659 659 FileNode('foobar/static/admin', content='admin',
660 660 mode=0120000), # this is a link
661 661 FileNode('foo', content='foo'),
662 662 ],
663 663 },
664 664 {
665 665 'message': 'Second',
666 666 'author': 'Joe Doe <joe.doe@example.com>',
667 667 'date': datetime.datetime(2010, 1, 1, 22),
668 668 'added': [
669 669 FileNode('foo2', content='foo2'),
670 670 ],
671 671 },
672 672 ]
673 673
674 674 def test_paths_slow_traversing(self):
675 675 cs = self.repo.get_changeset()
676 676 assert cs.get_node('foobar').get_node('static').get_node('js').get_node('admin').get_node('base.js').content == 'base'
677 677
678 678 def test_paths_fast_traversing(self):
679 679 cs = self.repo.get_changeset()
680 680 assert cs.get_node('foobar/static/js/admin/base.js').content == 'base'
681 681
682 682 def test_workdir_get_branch(self):
683 683 self.repo.run_git_command(['checkout', '-b', 'production'])
684 684 # Regression test: one of following would fail if we don't check
685 685 # .git/HEAD file
686 686 self.repo.run_git_command(['checkout', 'production'])
687 687 assert self.repo.workdir.get_branch() == 'production'
688 688 self.repo.run_git_command(['checkout', 'master'])
689 689 assert self.repo.workdir.get_branch() == 'master'
690 690
691 691 def test_get_diff_runs_git_command_with_hashes(self):
692 692 self.repo.run_git_command = mock.Mock(return_value=['', ''])
693 693 self.repo.get_diff(0, 1)
694 694 self.repo.run_git_command.assert_called_once_with(
695 695 ['diff', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
696 696 self.repo._get_revision(0), self.repo._get_revision(1)])
697 697
698 698 def test_get_diff_runs_git_command_with_str_hashes(self):
699 699 self.repo.run_git_command = mock.Mock(return_value=['', ''])
700 700 self.repo.get_diff(self.repo.EMPTY_CHANGESET, 1)
701 701 self.repo.run_git_command.assert_called_once_with(
702 702 ['show', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
703 703 self.repo._get_revision(1)])
704 704
705 705 def test_get_diff_runs_git_command_with_path_if_its_given(self):
706 706 self.repo.run_git_command = mock.Mock(return_value=['', ''])
707 707 self.repo.get_diff(0, 1, 'foo')
708 708 self.repo.run_git_command.assert_called_once_with(
709 709 ['diff', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
710 710 self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'])
711 711
712 712 def test_get_diff_does_not_sanitize_valid_context(self):
713 713 almost_overflowed_long_int = 2**31-1
714 714
715 715 self.repo.run_git_command = mock.Mock(return_value=['', ''])
716 716 self.repo.get_diff(0, 1, 'foo', context=almost_overflowed_long_int)
717 717 self.repo.run_git_command.assert_called_once_with(
718 718 ['diff', '-U' + str(almost_overflowed_long_int), '--full-index', '--binary', '-p', '-M', '--abbrev=40',
719 719 self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'])
720 720
721 721 def test_get_diff_sanitizes_overflowing_context(self):
722 722 overflowed_long_int = 2**31
723 723 sanitized_overflowed_long_int = overflowed_long_int-1
724 724
725 725 self.repo.run_git_command = mock.Mock(return_value=['', ''])
726 726 self.repo.get_diff(0, 1, 'foo', context=overflowed_long_int)
727 727
728 728 self.repo.run_git_command.assert_called_once_with(
729 729 ['diff', '-U' + str(sanitized_overflowed_long_int), '--full-index', '--binary', '-p', '-M', '--abbrev=40',
730 730 self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'])
731 731
732 732 def test_get_diff_does_not_sanitize_zero_context(self):
733 733 zero_context = 0
734 734
735 735 self.repo.run_git_command = mock.Mock(return_value=['', ''])
736 736 self.repo.get_diff(0, 1, 'foo', context=zero_context)
737 737
738 738 self.repo.run_git_command.assert_called_once_with(
739 739 ['diff', '-U' + str(zero_context), '--full-index', '--binary', '-p', '-M', '--abbrev=40',
740 740 self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'])
741 741
742 742 def test_get_diff_sanitizes_negative_context(self):
743 743 negative_context = -10
744 744
745 745 self.repo.run_git_command = mock.Mock(return_value=['', ''])
746 746 self.repo.get_diff(0, 1, 'foo', context=negative_context)
747 747
748 748 self.repo.run_git_command.assert_called_once_with(
749 749 ['diff', '-U0', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
750 750 self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'])
751 751
752 752
753 753 class TestGitRegression(_BackendTestMixin):
754 754 backend_alias = 'git'
755 755
756 756 @classmethod
757 757 def _get_commits(cls):
758 758 return [
759 759 {
760 760 'message': 'Initial',
761 761 'author': 'Joe Doe <joe.doe@example.com>',
762 762 'date': datetime.datetime(2010, 1, 1, 20),
763 763 'added': [
764 764 FileNode('bot/__init__.py', content='base'),
765 765 FileNode('bot/templates/404.html', content='base'),
766 766 FileNode('bot/templates/500.html', content='base'),
767 767 ],
768 768 },
769 769 {
770 770 'message': 'Second',
771 771 'author': 'Joe Doe <joe.doe@example.com>',
772 772 'date': datetime.datetime(2010, 1, 1, 22),
773 773 'added': [
774 774 FileNode('bot/build/migrations/1.py', content='foo2'),
775 775 FileNode('bot/build/migrations/2.py', content='foo2'),
776 776 FileNode('bot/build/static/templates/f.html', content='foo2'),
777 777 FileNode('bot/build/static/templates/f1.html', content='foo2'),
778 778 FileNode('bot/build/templates/err.html', content='foo2'),
779 779 FileNode('bot/build/templates/err2.html', content='foo2'),
780 780 ],
781 781 },
782 782 ]
783 783
784 784 def test_similar_paths(self):
785 785 cs = self.repo.get_changeset()
786 786 paths = lambda *n: [x.path for x in n]
787 787 assert paths(*cs.get_nodes('bot')) == ['bot/build', 'bot/templates', 'bot/__init__.py']
788 788 assert paths(*cs.get_nodes('bot/build')) == ['bot/build/migrations', 'bot/build/static', 'bot/build/templates']
789 789 assert paths(*cs.get_nodes('bot/build/static')) == ['bot/build/static/templates']
790 790 # this get_nodes below causes troubles !
791 791 assert paths(*cs.get_nodes('bot/build/static/templates')) == ['bot/build/static/templates/f.html', 'bot/build/static/templates/f1.html']
792 792 assert paths(*cs.get_nodes('bot/build/templates')) == ['bot/build/templates/err.html', 'bot/build/templates/err2.html']
793 793 assert paths(*cs.get_nodes('bot/templates/')) == ['bot/templates/404.html', 'bot/templates/500.html']
794 794
795 795
796 796 class TestGitHooks(object):
797 797 """
798 798 Tests related to hook functionality of Git repositories.
799 799 """
800 800
801 801 def setup_method(self):
802 802 # For each run we want a fresh repo.
803 803 self.repo_directory = get_new_dir("githookrepo")
804 804 self.repo = GitRepository(self.repo_directory, create=True)
805 805
806 806 # Create a dictionary where keys are hook names, and values are paths to
807 807 # them. Deduplicates code in tests a bit.
808 808 self.hook_directory = self.repo.get_hook_location()
809 809 self.kallithea_hooks = dict((h, os.path.join(self.hook_directory, h)) for h in ("pre-receive", "post-receive"))
810 810
811 811 def test_hooks_created_if_missing(self):
812 812 """
813 813 Tests if hooks are installed in repository if they are missing.
814 814 """
815 815
816 816 for hook, hook_path in self.kallithea_hooks.iteritems():
817 817 if os.path.exists(hook_path):
818 818 os.remove(hook_path)
819 819
820 820 ScmModel().install_git_hooks(repo=self.repo)
821 821
822 822 for hook, hook_path in self.kallithea_hooks.iteritems():
823 823 assert os.path.exists(hook_path)
824 824
825 825 def test_kallithea_hooks_updated(self):
826 826 """
827 827 Tests if hooks are updated if they are Kallithea hooks already.
828 828 """
829 829
830 830 for hook, hook_path in self.kallithea_hooks.iteritems():
831 831 with open(hook_path, "w") as f:
832 832 f.write("KALLITHEA_HOOK_VER=0.0.0\nJUST_BOGUS")
833 833
834 834 ScmModel().install_git_hooks(repo=self.repo)
835 835
836 836 for hook, hook_path in self.kallithea_hooks.iteritems():
837 837 with open(hook_path) as f:
838 838 assert "JUST_BOGUS" not in f.read()
839 839
840 840 def test_custom_hooks_untouched(self):
841 841 """
842 842 Tests if hooks are left untouched if they are not Kallithea hooks.
843 843 """
844 844
845 845 for hook, hook_path in self.kallithea_hooks.iteritems():
846 846 with open(hook_path, "w") as f:
847 847 f.write("#!/bin/bash\n#CUSTOM_HOOK")
848 848
849 849 ScmModel().install_git_hooks(repo=self.repo)
850 850
851 851 for hook, hook_path in self.kallithea_hooks.iteritems():
852 852 with open(hook_path) as f:
853 853 assert "CUSTOM_HOOK" in f.read()
854 854
855 855 def test_custom_hooks_forced_update(self):
856 856 """
857 857 Tests if hooks are forcefully updated even though they are custom hooks.
858 858 """
859 859
860 860 for hook, hook_path in self.kallithea_hooks.iteritems():
861 861 with open(hook_path, "w") as f:
862 862 f.write("#!/bin/bash\n#CUSTOM_HOOK")
863 863
864 864 ScmModel().install_git_hooks(repo=self.repo, force_create=True)
865 865
866 866 for hook, hook_path in self.kallithea_hooks.iteritems():
867 867 with open(hook_path) as f:
868 868 assert "KALLITHEA_HOOK_VER" in f.read()
@@ -1,592 +1,592 b''
1 1 import os
2 2
3 3 import mock
4 4 import pytest
5 5
6 6 from kallithea.lib.utils2 import safe_str
7 7 from kallithea.lib.vcs.backends.hg import MercurialChangeset, MercurialRepository
8 8 from kallithea.lib.vcs.exceptions import NodeDoesNotExistError, RepositoryError, VCSError
9 9 from kallithea.lib.vcs.nodes import NodeKind, NodeState
10 10 from kallithea.tests.vcs.conf import TEST_HG_REPO, TEST_HG_REPO_CLONE, TEST_HG_REPO_PULL, TESTS_TMP_PATH
11 11
12 12
13 13 class TestMercurialRepository(object):
14 14
15 15 def __check_for_existing_repo(self):
16 16 if os.path.exists(TEST_HG_REPO_CLONE):
17 17 pytest.fail('Cannot test mercurial clone repo as location %s already '
18 18 'exists. You should manually remove it first.'
19 19 % TEST_HG_REPO_CLONE)
20 20
21 21 def setup_method(self):
22 22 self.repo = MercurialRepository(safe_str(TEST_HG_REPO))
23 23
24 24 def test_wrong_repo_path(self):
25 25 wrong_repo_path = os.path.join(TESTS_TMP_PATH, 'errorrepo')
26 26 with pytest.raises(RepositoryError):
27 27 MercurialRepository(wrong_repo_path)
28 28
29 29 def test_unicode_path_repo(self):
30 30 with pytest.raises(VCSError):
31 31 MercurialRepository(u'iShouldFail')
32 32
33 33 def test_repo_clone(self):
34 34 self.__check_for_existing_repo()
35 35 repo = MercurialRepository(safe_str(TEST_HG_REPO))
36 36 repo_clone = MercurialRepository(TEST_HG_REPO_CLONE,
37 37 src_url=TEST_HG_REPO, update_after_clone=True)
38 38 assert len(repo.revisions) == len(repo_clone.revisions)
39 39 # Checking hashes of changesets should be enough
40 40 for changeset in repo.get_changesets():
41 41 raw_id = changeset.raw_id
42 42 assert raw_id == repo_clone.get_changeset(raw_id).raw_id
43 43
44 44 def test_repo_clone_with_update(self):
45 45 repo = MercurialRepository(safe_str(TEST_HG_REPO))
46 46 repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_w_update',
47 47 src_url=TEST_HG_REPO, update_after_clone=True)
48 48 assert len(repo.revisions) == len(repo_clone.revisions)
49 49
50 50 # check if current workdir was updated
51 51 assert os.path.isfile(
52 52 os.path.join(
53 53 TEST_HG_REPO_CLONE + '_w_update', 'MANIFEST.in'
54 54 )
55 55 )
56 56
57 57 def test_repo_clone_without_update(self):
58 58 repo = MercurialRepository(safe_str(TEST_HG_REPO))
59 59 repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_wo_update',
60 60 src_url=TEST_HG_REPO, update_after_clone=False)
61 61 assert len(repo.revisions) == len(repo_clone.revisions)
62 62 assert not os.path.isfile(
63 63 os.path.join(
64 64 TEST_HG_REPO_CLONE + '_wo_update', 'MANIFEST.in'
65 65 )
66 66 )
67 67
68 68 def test_pull(self):
69 69 if os.path.exists(TEST_HG_REPO_PULL):
70 70 pytest.fail('Cannot test mercurial pull command as location %s '
71 71 'already exists. You should manually remove it first'
72 72 % TEST_HG_REPO_PULL)
73 73 repo_new = MercurialRepository(TEST_HG_REPO_PULL, create=True)
74 74 assert len(self.repo.revisions) > len(repo_new.revisions)
75 75
76 76 repo_new.pull(self.repo.path)
77 77 repo_new = MercurialRepository(TEST_HG_REPO_PULL)
78 78 assert len(self.repo.revisions) == len(repo_new.revisions)
79 79
80 80 def test_revisions(self):
81 81 # there are 21 revisions at bitbucket now
82 82 # so we can assume they would be available from now on
83 83 subset = set(['b986218ba1c9b0d6a259fac9b050b1724ed8e545',
84 84 '3d8f361e72ab303da48d799ff1ac40d5ac37c67e',
85 85 '6cba7170863a2411822803fa77a0a264f1310b35',
86 86 '56349e29c2af3ac913b28bde9a2c6154436e615b',
87 87 '2dda4e345facb0ccff1a191052dd1606dba6781d',
88 88 '6fff84722075f1607a30f436523403845f84cd9e',
89 89 '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7',
90 90 '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb',
91 91 'dc5d2c0661b61928834a785d3e64a3f80d3aad9c',
92 92 'be90031137367893f1c406e0a8683010fd115b79',
93 93 'db8e58be770518cbb2b1cdfa69146e47cd481481',
94 94 '84478366594b424af694a6c784cb991a16b87c21',
95 95 '17f8e105dddb9f339600389c6dc7175d395a535c',
96 96 '20a662e756499bde3095ffc9bc0643d1def2d0eb',
97 97 '2e319b85e70a707bba0beff866d9f9de032aa4f9',
98 98 '786facd2c61deb9cf91e9534735124fb8fc11842',
99 99 '94593d2128d38210a2fcd1aabff6dda0d6d9edf8',
100 100 'aa6a0de05b7612707db567078e130a6cd114a9a7',
101 101 'eada5a770da98ab0dd7325e29d00e0714f228d09'
102 102 ])
103 103 assert subset.issubset(set(self.repo.revisions))
104 104
105 105 # check if we have the proper order of revisions
106 106 org = ['b986218ba1c9b0d6a259fac9b050b1724ed8e545',
107 107 '3d8f361e72ab303da48d799ff1ac40d5ac37c67e',
108 108 '6cba7170863a2411822803fa77a0a264f1310b35',
109 109 '56349e29c2af3ac913b28bde9a2c6154436e615b',
110 110 '2dda4e345facb0ccff1a191052dd1606dba6781d',
111 111 '6fff84722075f1607a30f436523403845f84cd9e',
112 112 '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7',
113 113 '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb',
114 114 'dc5d2c0661b61928834a785d3e64a3f80d3aad9c',
115 115 'be90031137367893f1c406e0a8683010fd115b79',
116 116 'db8e58be770518cbb2b1cdfa69146e47cd481481',
117 117 '84478366594b424af694a6c784cb991a16b87c21',
118 118 '17f8e105dddb9f339600389c6dc7175d395a535c',
119 119 '20a662e756499bde3095ffc9bc0643d1def2d0eb',
120 120 '2e319b85e70a707bba0beff866d9f9de032aa4f9',
121 121 '786facd2c61deb9cf91e9534735124fb8fc11842',
122 122 '94593d2128d38210a2fcd1aabff6dda0d6d9edf8',
123 123 'aa6a0de05b7612707db567078e130a6cd114a9a7',
124 124 'eada5a770da98ab0dd7325e29d00e0714f228d09',
125 125 '2c1885c735575ca478bf9e17b0029dca68824458',
126 126 'd9bcd465040bf869799b09ad732c04e0eea99fe9',
127 127 '469e9c847fe1f6f7a697b8b25b4bc5b48780c1a7',
128 128 '4fb8326d78e5120da2c7468dcf7098997be385da',
129 129 '62b4a097164940bd66030c4db51687f3ec035eed',
130 130 '536c1a19428381cfea92ac44985304f6a8049569',
131 131 '965e8ab3c44b070cdaa5bf727ddef0ada980ecc4',
132 132 '9bb326a04ae5d98d437dece54be04f830cf1edd9',
133 133 'f8940bcb890a98c4702319fbe36db75ea309b475',
134 134 'ff5ab059786ebc7411e559a2cc309dfae3625a3b',
135 135 '6b6ad5f82ad5bb6190037671bd254bd4e1f4bf08',
136 136 'ee87846a61c12153b51543bf860e1026c6d3dcba', ]
137 137 assert org == self.repo.revisions[:31]
138 138
139 139 def test_iter_slice(self):
140 140 sliced = list(self.repo[:10])
141 141 itered = list(self.repo)[:10]
142 142 assert sliced == itered
143 143
144 144 def test_slicing(self):
145 145 # 4 1 5 10 95
146 146 for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
147 147 (10, 20, 10), (5, 100, 95)]:
148 148 revs = list(self.repo[sfrom:sto])
149 149 assert len(revs) == size
150 150 assert revs[0] == self.repo.get_changeset(sfrom)
151 151 assert revs[-1] == self.repo.get_changeset(sto - 1)
152 152
153 153 def test_branches(self):
154 154 # TODO: Need more tests here
155 155
156 156 # active branches
157 157 assert 'default' in self.repo.branches
158 158 assert 'stable' in self.repo.branches
159 159
160 160 # closed
161 161 assert 'git' in self.repo._get_branches(closed=True)
162 162 assert 'web' in self.repo._get_branches(closed=True)
163 163
164 164 for name, id in self.repo.branches.items():
165 165 assert isinstance(self.repo.get_changeset(id), MercurialChangeset)
166 166
167 167 def test_tip_in_tags(self):
168 168 # tip is always a tag
169 169 assert 'tip' in self.repo.tags
170 170
171 171 def test_tip_changeset_in_tags(self):
172 172 tip = self.repo.get_changeset()
173 173 assert self.repo.tags['tip'] == tip.raw_id
174 174
175 175 def test_initial_changeset(self):
176 176
177 177 init_chset = self.repo.get_changeset(0)
178 178 assert init_chset.message == 'initial import'
179 179 assert init_chset.author == 'Marcin Kuzminski <marcin@python-blog.com>'
180 180 assert sorted(init_chset._file_paths) == sorted([
181 181 'vcs/__init__.py',
182 182 'vcs/backends/BaseRepository.py',
183 183 'vcs/backends/__init__.py',
184 184 ])
185 185
186 186 assert sorted(init_chset._dir_paths) == sorted(['', 'vcs', 'vcs/backends'])
187 187
188 188 with pytest.raises(NodeDoesNotExistError):
189 189 init_chset.get_node(path='foobar')
190 190
191 191 node = init_chset.get_node('vcs/')
192 192 assert hasattr(node, 'kind')
193 193 assert node.kind == NodeKind.DIR
194 194
195 195 node = init_chset.get_node('vcs')
196 196 assert hasattr(node, 'kind')
197 197 assert node.kind == NodeKind.DIR
198 198
199 199 node = init_chset.get_node('vcs/__init__.py')
200 200 assert hasattr(node, 'kind')
201 201 assert node.kind == NodeKind.FILE
202 202
203 203 def test_not_existing_changeset(self):
204 204 # rawid
205 205 with pytest.raises(RepositoryError):
206 206 self.repo.get_changeset('abcd' * 10)
207 207 # shortid
208 208 with pytest.raises(RepositoryError):
209 209 self.repo.get_changeset('erro' * 4)
210 210 # numeric
211 211 with pytest.raises(RepositoryError):
212 212 self.repo.get_changeset(self.repo.count() + 1)
213 213
214 214 # Small chance we ever get to this one
215 215 revision = pow(2, 30)
216 216 with pytest.raises(RepositoryError):
217 217 self.repo.get_changeset(revision)
218 218
219 219 def test_changeset10(self):
220 220
221 221 chset10 = self.repo.get_changeset(10)
222 222 readme = """===
223 223 VCS
224 224 ===
225 225
226 226 Various Version Control System management abstraction layer for Python.
227 227
228 228 Introduction
229 229 ------------
230 230
231 231 TODO: To be written...
232 232
233 233 """
234 234 node = chset10.get_node('README.rst')
235 235 assert node.kind == NodeKind.FILE
236 236 assert node.content == readme
237 237
238 238 @mock.patch('kallithea.lib.vcs.backends.hg.repository.diffopts')
239 239 def test_get_diff_does_not_sanitize_zero_context(self, mock_diffopts):
240 240 zero_context = 0
241 241
242 242 self.repo.get_diff(0, 1, 'foo', context=zero_context)
243 243
244 244 mock_diffopts.assert_called_once_with(git=True, showfunc=True, ignorews=False, context=zero_context)
245 245
246 246 @mock.patch('kallithea.lib.vcs.backends.hg.repository.diffopts')
247 247 def test_get_diff_sanitizes_negative_context(self, mock_diffopts):
248 248 negative_context = -10
249 249 zero_context = 0
250 250
251 251 self.repo.get_diff(0, 1, 'foo', context=negative_context)
252 252
253 253 mock_diffopts.assert_called_once_with(git=True, showfunc=True, ignorews=False, context=zero_context)
254 254
255 255
256 256 class TestMercurialChangeset(object):
257 257
258 258 def setup_method(self):
259 259 self.repo = MercurialRepository(safe_str(TEST_HG_REPO))
260 260
261 261 def _test_equality(self, changeset):
262 262 revision = changeset.revision
263 263 assert changeset == self.repo.get_changeset(revision)
264 264
265 265 def test_equality(self):
266 266 revs = [0, 10, 20]
267 267 changesets = [self.repo.get_changeset(rev) for rev in revs]
268 268 for changeset in changesets:
269 269 self._test_equality(changeset)
270 270
271 271 def test_default_changeset(self):
272 272 tip = self.repo.get_changeset('tip')
273 273 assert tip == self.repo.get_changeset()
274 274 assert tip == self.repo.get_changeset(revision=None)
275 275 assert tip == list(self.repo[-1:])[0]
276 276
277 277 def test_root_node(self):
278 278 tip = self.repo.get_changeset('tip')
279 279 assert tip.root is tip.get_node('')
280 280
281 281 def test_lazy_fetch(self):
282 282 """
283 283 Test if changeset's nodes expands and are cached as we walk through
284 284 the revision. This test is somewhat hard to write as order of tests
285 285 is a key here. Written by running command after command in a shell.
286 286 """
287 287 chset = self.repo.get_changeset(45)
288 288 assert len(chset.nodes) == 0
289 289 root = chset.root
290 290 assert len(chset.nodes) == 1
291 291 assert len(root.nodes) == 8
292 292 # accessing root.nodes updates chset.nodes
293 293 assert len(chset.nodes) == 9
294 294
295 295 docs = root.get_node('docs')
296 296 # we haven't yet accessed anything new as docs dir was already cached
297 297 assert len(chset.nodes) == 9
298 298 assert len(docs.nodes) == 8
299 299 # accessing docs.nodes updates chset.nodes
300 300 assert len(chset.nodes) == 17
301 301
302 302 assert docs is chset.get_node('docs')
303 303 assert docs is root.nodes[0]
304 304 assert docs is root.dirs[0]
305 305 assert docs is chset.get_node('docs')
306 306
307 307 def test_nodes_with_changeset(self):
308 308 chset = self.repo.get_changeset(45)
309 309 root = chset.root
310 310 docs = root.get_node('docs')
311 311 assert docs is chset.get_node('docs')
312 312 api = docs.get_node('api')
313 313 assert api is chset.get_node('docs/api')
314 314 index = api.get_node('index.rst')
315 315 assert index is chset.get_node('docs/api/index.rst')
316 316 assert index is chset.get_node('docs').get_node('api').get_node('index.rst')
317 317
318 318 def test_branch_and_tags(self):
319 319 chset0 = self.repo.get_changeset(0)
320 320 assert chset0.branch == 'default'
321 321 assert chset0.branches == ['default']
322 322 assert chset0.tags == []
323 323
324 324 chset10 = self.repo.get_changeset(10)
325 325 assert chset10.branch == 'default'
326 326 assert chset10.branches == ['default']
327 327 assert chset10.tags == []
328 328
329 329 chset44 = self.repo.get_changeset(44)
330 330 assert chset44.branch == 'web'
331 331 assert chset44.branches == ['web']
332 332
333 333 tip = self.repo.get_changeset('tip')
334 334 assert 'tip' in tip.tags
335 335
336 336 def _test_file_size(self, revision, path, size):
337 337 node = self.repo.get_changeset(revision).get_node(path)
338 338 assert node.is_file()
339 339 assert node.size == size
340 340
341 341 def test_file_size(self):
342 342 to_check = (
343 343 (10, 'setup.py', 1068),
344 344 (20, 'setup.py', 1106),
345 345 (60, 'setup.py', 1074),
346 346
347 347 (10, 'vcs/backends/base.py', 2921),
348 348 (20, 'vcs/backends/base.py', 3936),
349 349 (60, 'vcs/backends/base.py', 6189),
350 350 )
351 351 for revision, path, size in to_check:
352 352 self._test_file_size(revision, path, size)
353 353
354 354 def _test_dir_size(self, revision, path, size):
355 355 node = self.repo.get_changeset(revision).get_node(path)
356 356 assert not node.is_file()
357 357 assert node.size == size
358 358
359 359 def test_dir_size(self):
360 360 to_check = (
361 361 ('96507bd11ecc', '/', 682421),
362 362 ('a53d9201d4bc', '/', 682410),
363 363 ('90243de06161', '/', 682006),
364 364 )
365 365 for revision, path, size in to_check:
366 366 self._test_dir_size(revision, path, size)
367 367
368 368 def test_repo_size(self):
369 369 assert self.repo.size == 682421
370 370
371 371 def test_file_history(self):
372 372 # we can only check if those revisions are present in the history
373 373 # as we cannot update this test every time file is changed
374 374 files = {
375 375 'setup.py': [7, 18, 45, 46, 47, 69, 77],
376 376 'vcs/nodes.py': [7, 8, 24, 26, 30, 45, 47, 49, 56, 57, 58, 59, 60,
377 377 61, 73, 76],
378 378 'vcs/backends/hg.py': [4, 5, 6, 11, 12, 13, 14, 15, 16, 21, 22, 23,
379 379 26, 27, 28, 30, 31, 33, 35, 36, 37, 38, 39, 40, 41, 44, 45, 47,
380 380 48, 49, 53, 54, 55, 58, 60, 61, 67, 68, 69, 70, 73, 77, 78, 79,
381 381 82],
382 382 }
383 383 for path, revs in files.items():
384 384 tip = self.repo.get_changeset(revs[-1])
385 385 node = tip.get_node(path)
386 386 node_revs = [chset.revision for chset in node.history]
387 387 assert set(revs).issubset(set(node_revs)), \
388 388 "We assumed that %s is subset of revisions for which file %s " \
389 389 "has been changed, and history of that node returned: %s" \
390 390 % (revs, path, node_revs)
391 391
392 392 def test_file_annotate(self):
393 393 files = {
394 394 'vcs/backends/__init__.py':
395 395 {89: {'lines_no': 31,
396 396 'changesets': [32, 32, 61, 32, 32, 37, 32, 32, 32, 44,
397 397 37, 37, 37, 37, 45, 37, 44, 37, 37, 37,
398 398 32, 32, 32, 32, 37, 32, 37, 37, 32,
399 399 32, 32]},
400 400 20: {'lines_no': 1,
401 401 'changesets': [4]},
402 402 55: {'lines_no': 31,
403 403 'changesets': [32, 32, 45, 32, 32, 37, 32, 32, 32, 44,
404 404 37, 37, 37, 37, 45, 37, 44, 37, 37, 37,
405 405 32, 32, 32, 32, 37, 32, 37, 37, 32,
406 406 32, 32]}},
407 407 'vcs/exceptions.py':
408 408 {89: {'lines_no': 18,
409 409 'changesets': [16, 16, 16, 16, 16, 16, 16, 16, 16, 16,
410 410 16, 16, 17, 16, 16, 18, 18, 18]},
411 411 20: {'lines_no': 18,
412 412 'changesets': [16, 16, 16, 16, 16, 16, 16, 16, 16, 16,
413 413 16, 16, 17, 16, 16, 18, 18, 18]},
414 414 55: {'lines_no': 18, 'changesets': [16, 16, 16, 16, 16, 16,
415 415 16, 16, 16, 16, 16, 16,
416 416 17, 16, 16, 18, 18, 18]}},
417 417 'MANIFEST.in': {89: {'lines_no': 5,
418 418 'changesets': [7, 7, 7, 71, 71]},
419 419 20: {'lines_no': 3,
420 420 'changesets': [7, 7, 7]},
421 421 55: {'lines_no': 3,
422 422 'changesets': [7, 7, 7]}}}
423 423
424 424 for fname, revision_dict in files.items():
425 425 for rev, data in revision_dict.items():
426 426 cs = self.repo.get_changeset(rev)
427 427 l1_1 = [x[1] for x in cs.get_file_annotate(fname)]
428 428 l1_2 = [x[2]().raw_id for x in cs.get_file_annotate(fname)]
429 429 assert l1_1 == l1_2
430 430 l1 = l1_2 = [x[2]().revision for x in cs.get_file_annotate(fname)]
431 431 l2 = files[fname][rev]['changesets']
432 432 assert l1 == l2, "The lists of revision for %s@rev%s" \
433 433 "from annotation list should match each other," \
434 434 "got \n%s \nvs \n%s " % (fname, rev, l1, l2)
435 435
436 436 def test_changeset_state(self):
437 437 """
438 438 Tests which files have been added/changed/removed at particular revision
439 439 """
440 440
441 441 # rev 46ad32a4f974:
442 442 # hg st --rev 46ad32a4f974
443 443 # changed: 13
444 444 # added: 20
445 445 # removed: 1
446 446 changed = set(['.hgignore'
447 447 , 'README.rst', 'docs/conf.py', 'docs/index.rst', 'setup.py'
448 448 , 'tests/test_hg.py', 'tests/test_nodes.py', 'vcs/__init__.py'
449 449 , 'vcs/backends/__init__.py', 'vcs/backends/base.py'
450 450 , 'vcs/backends/hg.py', 'vcs/nodes.py', 'vcs/utils/__init__.py'])
451 451
452 452 added = set(['docs/api/backends/hg.rst'
453 453 , 'docs/api/backends/index.rst', 'docs/api/index.rst'
454 454 , 'docs/api/nodes.rst', 'docs/api/web/index.rst'
455 455 , 'docs/api/web/simplevcs.rst', 'docs/installation.rst'
456 456 , 'docs/quickstart.rst', 'setup.cfg', 'vcs/utils/baseui_config.py'
457 457 , 'vcs/utils/web.py', 'vcs/web/__init__.py', 'vcs/web/exceptions.py'
458 458 , 'vcs/web/simplevcs/__init__.py', 'vcs/web/simplevcs/exceptions.py'
459 459 , 'vcs/web/simplevcs/middleware.py', 'vcs/web/simplevcs/models.py'
460 460 , 'vcs/web/simplevcs/settings.py', 'vcs/web/simplevcs/utils.py'
461 461 , 'vcs/web/simplevcs/views.py'])
462 462
463 463 removed = set(['docs/api.rst'])
464 464
465 465 chset64 = self.repo.get_changeset('46ad32a4f974')
466 466 assert set((node.path for node in chset64.added)) == added
467 467 assert set((node.path for node in chset64.changed)) == changed
468 468 assert set((node.path for node in chset64.removed)) == removed
469 469
470 470 # rev b090f22d27d6:
471 471 # hg st --rev b090f22d27d6
472 472 # changed: 13
473 473 # added: 20
474 474 # removed: 1
475 475 chset88 = self.repo.get_changeset('b090f22d27d6')
476 476 assert set((node.path for node in chset88.added)) == set()
477 477 assert set((node.path for node in chset88.changed)) == set(['.hgignore'])
478 478 assert set((node.path for node in chset88.removed)) == set()
479 479
480 480 # 85:
481 481 # added: 2 ['vcs/utils/diffs.py', 'vcs/web/simplevcs/views/diffs.py']
482 482 # changed: 4 ['vcs/web/simplevcs/models.py', ...]
483 483 # removed: 1 ['vcs/utils/web.py']
484 484 chset85 = self.repo.get_changeset(85)
485 485 assert set((node.path for node in chset85.added)) == set([
486 486 'vcs/utils/diffs.py',
487 487 'vcs/web/simplevcs/views/diffs.py'
488 488 ])
489 489
490 490 assert set((node.path for node in chset85.changed)) == set([
491 491 'vcs/web/simplevcs/models.py',
492 492 'vcs/web/simplevcs/utils.py',
493 493 'vcs/web/simplevcs/views/__init__.py',
494 494 'vcs/web/simplevcs/views/repository.py',
495 495 ])
496 496
497 497 assert set((node.path for node in chset85.removed)) == set([
498 498 'vcs/utils/web.py'
499 499 ])
500 500
501 501
502 502 def test_files_state(self):
503 503 """
504 504 Tests state of FileNodes.
505 505 """
506 506 chset = self.repo.get_changeset(85)
507 507 node = chset.get_node('vcs/utils/diffs.py')
508 508 assert node.state, NodeState.ADDED
509 509 assert node.added
510 510 assert not node.changed
511 511 assert not node.not_changed
512 512 assert not node.removed
513 513
514 514 chset = self.repo.get_changeset(88)
515 515 node = chset.get_node('.hgignore')
516 516 assert node.state, NodeState.CHANGED
517 517 assert not node.added
518 518 assert node.changed
519 519 assert not node.not_changed
520 520 assert not node.removed
521 521
522 522 chset = self.repo.get_changeset(85)
523 523 node = chset.get_node('setup.py')
524 524 assert node.state, NodeState.NOT_CHANGED
525 525 assert not node.added
526 526 assert not node.changed
527 527 assert node.not_changed
528 528 assert not node.removed
529 529
530 530 # If node has REMOVED state then trying to fetch it would raise
531 531 # ChangesetError exception
532 532 chset = self.repo.get_changeset(2)
533 533 path = 'vcs/backends/BaseRepository.py'
534 534 with pytest.raises(NodeDoesNotExistError):
535 535 chset.get_node(path)
536 536 # but it would be one of ``removed`` (changeset's attribute)
537 537 assert path in [rf.path for rf in chset.removed]
538 538
539 539 def test_commit_message_is_unicode(self):
540 540 for cm in self.repo:
541 assert type(cm.message) == unicode
541 assert isinstance(cm.message, unicode)
542 542
543 543 def test_changeset_author_is_unicode(self):
544 544 for cm in self.repo:
545 assert type(cm.author) == unicode
545 assert isinstance(cm.author, unicode)
546 546
547 547 def test_repo_files_content_is_unicode(self):
548 548 test_changeset = self.repo.get_changeset(100)
549 549 for node in test_changeset.get_node('/'):
550 550 if node.is_file():
551 assert type(node.content) == unicode
551 assert isinstance(node.content, unicode)
552 552
553 553 def test_wrong_path(self):
554 554 # There is 'setup.py' in the root dir but not there:
555 555 path = 'foo/bar/setup.py'
556 556 with pytest.raises(VCSError):
557 557 self.repo.get_changeset().get_node(path)
558 558
559 559 def test_archival_file(self):
560 560 # TODO:
561 561 pass
562 562
563 563 def test_archival_as_generator(self):
564 564 # TODO:
565 565 pass
566 566
567 567 def test_archival_wrong_kind(self):
568 568 tip = self.repo.get_changeset()
569 569 with pytest.raises(VCSError):
570 570 tip.fill_archive(kind='error')
571 571
572 572 def test_archival_empty_prefix(self):
573 573 # TODO:
574 574 pass
575 575
576 576 def test_author_email(self):
577 577 assert 'marcin@python-blog.com' == self.repo.get_changeset('b986218ba1c9').author_email
578 578 assert 'lukasz.balcerzak@python-center.pl' == self.repo.get_changeset('3803844fdbd3').author_email
579 579 assert '' == self.repo.get_changeset('84478366594b').author_email
580 580
581 581 def test_author_username(self):
582 582 assert 'Marcin Kuzminski' == self.repo.get_changeset('b986218ba1c9').author_name
583 583 assert 'Lukasz Balcerzak' == self.repo.get_changeset('3803844fdbd3').author_name
584 584 assert 'marcink' == self.repo.get_changeset('84478366594b').author_name
585 585
586 586 def test_successors(self):
587 587 init_chset = self.repo.get_changeset(0)
588 588 assert init_chset.successors == []
589 589
590 590 def test_predecessors(self):
591 591 init_chset = self.repo.get_changeset(0)
592 592 assert len(init_chset.predecessors) == 0
General Comments 0
You need to be logged in to leave comments. Login now