##// END OF EJS Templates
utils: switched age function to use lazy translated pyramid translation mechanism.
marcink -
r1317:3fc8c7de default
parent child Browse files
Show More
@@ -1,951 +1,952 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2011-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21
22 22 """
23 23 Some simple helper functions
24 24 """
25 25
26 26
27 27 import collections
28 28 import datetime
29 29 import dateutil.relativedelta
30 30 import hashlib
31 31 import logging
32 32 import re
33 33 import sys
34 34 import time
35 35 import threading
36 36 import urllib
37 37 import urlobject
38 38 import uuid
39 39
40 40 import pygments.lexers
41 41 import sqlalchemy
42 42 import sqlalchemy.engine.url
43 43 import webob
44 44 import routes.util
45 45
46 46 import rhodecode
47 from rhodecode.translation import _, _pluralize
47 48
48 49
49 50 def md5(s):
50 51 return hashlib.md5(s).hexdigest()
51 52
52 53
53 54 def md5_safe(s):
54 55 return md5(safe_str(s))
55 56
56 57
57 58 def __get_lem(extra_mapping=None):
58 59 """
59 60 Get language extension map based on what's inside pygments lexers
60 61 """
61 62 d = collections.defaultdict(lambda: [])
62 63
63 64 def __clean(s):
64 65 s = s.lstrip('*')
65 66 s = s.lstrip('.')
66 67
67 68 if s.find('[') != -1:
68 69 exts = []
69 70 start, stop = s.find('['), s.find(']')
70 71
71 72 for suffix in s[start + 1:stop]:
72 73 exts.append(s[:s.find('[')] + suffix)
73 74 return [e.lower() for e in exts]
74 75 else:
75 76 return [s.lower()]
76 77
77 78 for lx, t in sorted(pygments.lexers.LEXERS.items()):
78 79 m = map(__clean, t[-2])
79 80 if m:
80 81 m = reduce(lambda x, y: x + y, m)
81 82 for ext in m:
82 83 desc = lx.replace('Lexer', '')
83 84 d[ext].append(desc)
84 85
85 86 data = dict(d)
86 87
87 88 extra_mapping = extra_mapping or {}
88 89 if extra_mapping:
89 90 for k, v in extra_mapping.items():
90 91 if k not in data:
91 92 # register new mapping2lexer
92 93 data[k] = [v]
93 94
94 95 return data
95 96
96 97
97 98 def str2bool(_str):
98 99 """
99 100 returns True/False value from given string, it tries to translate the
100 101 string into boolean
101 102
102 103 :param _str: string value to translate into boolean
103 104 :rtype: boolean
104 105 :returns: boolean from given string
105 106 """
106 107 if _str is None:
107 108 return False
108 109 if _str in (True, False):
109 110 return _str
110 111 _str = str(_str).strip().lower()
111 112 return _str in ('t', 'true', 'y', 'yes', 'on', '1')
112 113
113 114
114 115 def aslist(obj, sep=None, strip=True):
115 116 """
116 117 Returns given string separated by sep as list
117 118
118 119 :param obj:
119 120 :param sep:
120 121 :param strip:
121 122 """
122 123 if isinstance(obj, (basestring,)):
123 124 lst = obj.split(sep)
124 125 if strip:
125 126 lst = [v.strip() for v in lst]
126 127 return lst
127 128 elif isinstance(obj, (list, tuple)):
128 129 return obj
129 130 elif obj is None:
130 131 return []
131 132 else:
132 133 return [obj]
133 134
134 135
135 136 def convert_line_endings(line, mode):
136 137 """
137 138 Converts a given line "line end" accordingly to given mode
138 139
139 140 Available modes are::
140 141 0 - Unix
141 142 1 - Mac
142 143 2 - DOS
143 144
144 145 :param line: given line to convert
145 146 :param mode: mode to convert to
146 147 :rtype: str
147 148 :return: converted line according to mode
148 149 """
149 150 if mode == 0:
150 151 line = line.replace('\r\n', '\n')
151 152 line = line.replace('\r', '\n')
152 153 elif mode == 1:
153 154 line = line.replace('\r\n', '\r')
154 155 line = line.replace('\n', '\r')
155 156 elif mode == 2:
156 157 line = re.sub('\r(?!\n)|(?<!\r)\n', '\r\n', line)
157 158 return line
158 159
159 160
160 161 def detect_mode(line, default):
161 162 """
162 163 Detects line break for given line, if line break couldn't be found
163 164 given default value is returned
164 165
165 166 :param line: str line
166 167 :param default: default
167 168 :rtype: int
168 169 :return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS
169 170 """
170 171 if line.endswith('\r\n'):
171 172 return 2
172 173 elif line.endswith('\n'):
173 174 return 0
174 175 elif line.endswith('\r'):
175 176 return 1
176 177 else:
177 178 return default
178 179
179 180
180 181 def safe_int(val, default=None):
181 182 """
182 183 Returns int() of val if val is not convertable to int use default
183 184 instead
184 185
185 186 :param val:
186 187 :param default:
187 188 """
188 189
189 190 try:
190 191 val = int(val)
191 192 except (ValueError, TypeError):
192 193 val = default
193 194
194 195 return val
195 196
196 197
197 198 def safe_unicode(str_, from_encoding=None):
198 199 """
199 200 safe unicode function. Does few trick to turn str_ into unicode
200 201
201 202 In case of UnicodeDecode error, we try to return it with encoding detected
202 203 by chardet library if it fails fallback to unicode with errors replaced
203 204
204 205 :param str_: string to decode
205 206 :rtype: unicode
206 207 :returns: unicode object
207 208 """
208 209 if isinstance(str_, unicode):
209 210 return str_
210 211
211 212 if not from_encoding:
212 213 DEFAULT_ENCODINGS = aslist(rhodecode.CONFIG.get('default_encoding',
213 214 'utf8'), sep=',')
214 215 from_encoding = DEFAULT_ENCODINGS
215 216
216 217 if not isinstance(from_encoding, (list, tuple)):
217 218 from_encoding = [from_encoding]
218 219
219 220 try:
220 221 return unicode(str_)
221 222 except UnicodeDecodeError:
222 223 pass
223 224
224 225 for enc in from_encoding:
225 226 try:
226 227 return unicode(str_, enc)
227 228 except UnicodeDecodeError:
228 229 pass
229 230
230 231 try:
231 232 import chardet
232 233 encoding = chardet.detect(str_)['encoding']
233 234 if encoding is None:
234 235 raise Exception()
235 236 return str_.decode(encoding)
236 237 except (ImportError, UnicodeDecodeError, Exception):
237 238 return unicode(str_, from_encoding[0], 'replace')
238 239
239 240
240 241 def safe_str(unicode_, to_encoding=None):
241 242 """
242 243 safe str function. Does few trick to turn unicode_ into string
243 244
244 245 In case of UnicodeEncodeError, we try to return it with encoding detected
245 246 by chardet library if it fails fallback to string with errors replaced
246 247
247 248 :param unicode_: unicode to encode
248 249 :rtype: str
249 250 :returns: str object
250 251 """
251 252
252 253 # if it's not basestr cast to str
253 254 if not isinstance(unicode_, basestring):
254 255 return str(unicode_)
255 256
256 257 if isinstance(unicode_, str):
257 258 return unicode_
258 259
259 260 if not to_encoding:
260 261 DEFAULT_ENCODINGS = aslist(rhodecode.CONFIG.get('default_encoding',
261 262 'utf8'), sep=',')
262 263 to_encoding = DEFAULT_ENCODINGS
263 264
264 265 if not isinstance(to_encoding, (list, tuple)):
265 266 to_encoding = [to_encoding]
266 267
267 268 for enc in to_encoding:
268 269 try:
269 270 return unicode_.encode(enc)
270 271 except UnicodeEncodeError:
271 272 pass
272 273
273 274 try:
274 275 import chardet
275 276 encoding = chardet.detect(unicode_)['encoding']
276 277 if encoding is None:
277 278 raise UnicodeEncodeError()
278 279
279 280 return unicode_.encode(encoding)
280 281 except (ImportError, UnicodeEncodeError):
281 282 return unicode_.encode(to_encoding[0], 'replace')
282 283
283 284
284 285 def remove_suffix(s, suffix):
285 286 if s.endswith(suffix):
286 287 s = s[:-1 * len(suffix)]
287 288 return s
288 289
289 290
290 291 def remove_prefix(s, prefix):
291 292 if s.startswith(prefix):
292 293 s = s[len(prefix):]
293 294 return s
294 295
295 296
296 297 def find_calling_context(ignore_modules=None):
297 298 """
298 299 Look through the calling stack and return the frame which called
299 300 this function and is part of core module ( ie. rhodecode.* )
300 301
301 302 :param ignore_modules: list of modules to ignore eg. ['rhodecode.lib']
302 303 """
303 304
304 305 ignore_modules = ignore_modules or []
305 306
306 307 f = sys._getframe(2)
307 308 while f.f_back is not None:
308 309 name = f.f_globals.get('__name__')
309 310 if name and name.startswith(__name__.split('.')[0]):
310 311 if name not in ignore_modules:
311 312 return f
312 313 f = f.f_back
313 314 return None
314 315
315 316
316 317 def engine_from_config(configuration, prefix='sqlalchemy.', **kwargs):
317 318 """Custom engine_from_config functions."""
318 319 log = logging.getLogger('sqlalchemy.engine')
319 320 engine = sqlalchemy.engine_from_config(configuration, prefix, **kwargs)
320 321
321 322 def color_sql(sql):
322 323 color_seq = '\033[1;33m' # This is yellow: code 33
323 324 normal = '\x1b[0m'
324 325 return ''.join([color_seq, sql, normal])
325 326
326 327 if configuration['debug']:
327 328 # attach events only for debug configuration
328 329
329 330 def before_cursor_execute(conn, cursor, statement,
330 331 parameters, context, executemany):
331 332 setattr(conn, 'query_start_time', time.time())
332 333 log.info(color_sql(">>>>> STARTING QUERY >>>>>"))
333 334 calling_context = find_calling_context(ignore_modules=[
334 335 'rhodecode.lib.caching_query',
335 336 'rhodecode.model.settings',
336 337 ])
337 338 if calling_context:
338 339 log.info(color_sql('call context %s:%s' % (
339 340 calling_context.f_code.co_filename,
340 341 calling_context.f_lineno,
341 342 )))
342 343
343 344 def after_cursor_execute(conn, cursor, statement,
344 345 parameters, context, executemany):
345 346 delattr(conn, 'query_start_time')
346 347
347 348 sqlalchemy.event.listen(engine, "before_cursor_execute",
348 349 before_cursor_execute)
349 350 sqlalchemy.event.listen(engine, "after_cursor_execute",
350 351 after_cursor_execute)
351 352
352 353 return engine
353 354
354 355
355 356 def get_encryption_key(config):
356 357 secret = config.get('rhodecode.encrypted_values.secret')
357 358 default = config['beaker.session.secret']
358 359 return secret or default
359 360
360 361
361 362 def age(prevdate, now=None, show_short_version=False, show_suffix=True,
362 363 short_format=False):
363 364 """
364 365 Turns a datetime into an age string.
365 366 If show_short_version is True, this generates a shorter string with
366 367 an approximate age; ex. '1 day ago', rather than '1 day and 23 hours ago'.
367 368
368 369 * IMPORTANT*
369 370 Code of this function is written in special way so it's easier to
370 371 backport it to javascript. If you mean to update it, please also update
371 372 `jquery.timeago-extension.js` file
372 373
373 374 :param prevdate: datetime object
374 375 :param now: get current time, if not define we use
375 376 `datetime.datetime.now()`
376 377 :param show_short_version: if it should approximate the date and
377 378 return a shorter string
378 379 :param show_suffix:
379 380 :param short_format: show short format, eg 2D instead of 2 days
380 381 :rtype: unicode
381 382 :returns: unicode words describing age
382 383 """
383 from pylons.i18n.translation import _, ungettext
384 384
385 385 def _get_relative_delta(now, prevdate):
386 386 base = dateutil.relativedelta.relativedelta(now, prevdate)
387 387 return {
388 388 'year': base.years,
389 389 'month': base.months,
390 390 'day': base.days,
391 391 'hour': base.hours,
392 392 'minute': base.minutes,
393 393 'second': base.seconds,
394 394 }
395 395
396 396 def _is_leap_year(year):
397 397 return year % 4 == 0 and (year % 100 != 0 or year % 400 == 0)
398 398
399 399 def get_month(prevdate):
400 400 return prevdate.month
401 401
402 402 def get_year(prevdate):
403 403 return prevdate.year
404 404
405 405 now = now or datetime.datetime.now()
406 406 order = ['year', 'month', 'day', 'hour', 'minute', 'second']
407 407 deltas = {}
408 408 future = False
409 409
410 410 if prevdate > now:
411 411 now_old = now
412 412 now = prevdate
413 413 prevdate = now_old
414 414 future = True
415 415 if future:
416 416 prevdate = prevdate.replace(microsecond=0)
417 417 # Get date parts deltas
418 418 for part in order:
419 419 rel_delta = _get_relative_delta(now, prevdate)
420 420 deltas[part] = rel_delta[part]
421 421
422 422 # Fix negative offsets (there is 1 second between 10:59:59 and 11:00:00,
423 423 # not 1 hour, -59 minutes and -59 seconds)
424 424 offsets = [[5, 60], [4, 60], [3, 24]]
425 425 for element in offsets: # seconds, minutes, hours
426 426 num = element[0]
427 427 length = element[1]
428 428
429 429 part = order[num]
430 430 carry_part = order[num - 1]
431 431
432 432 if deltas[part] < 0:
433 433 deltas[part] += length
434 434 deltas[carry_part] -= 1
435 435
436 436 # Same thing for days except that the increment depends on the (variable)
437 437 # number of days in the month
438 438 month_lengths = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
439 439 if deltas['day'] < 0:
440 440 if get_month(prevdate) == 2 and _is_leap_year(get_year(prevdate)):
441 441 deltas['day'] += 29
442 442 else:
443 443 deltas['day'] += month_lengths[get_month(prevdate) - 1]
444 444
445 445 deltas['month'] -= 1
446 446
447 447 if deltas['month'] < 0:
448 448 deltas['month'] += 12
449 449 deltas['year'] -= 1
450 450
451 451 # Format the result
452 452 if short_format:
453 453 fmt_funcs = {
454 454 'year': lambda d: u'%dy' % d,
455 455 'month': lambda d: u'%dm' % d,
456 456 'day': lambda d: u'%dd' % d,
457 457 'hour': lambda d: u'%dh' % d,
458 458 'minute': lambda d: u'%dmin' % d,
459 459 'second': lambda d: u'%dsec' % d,
460 460 }
461 461 else:
462 462 fmt_funcs = {
463 'year': lambda d: ungettext(u'%d year', '%d years', d) % d,
464 'month': lambda d: ungettext(u'%d month', '%d months', d) % d,
465 'day': lambda d: ungettext(u'%d day', '%d days', d) % d,
466 'hour': lambda d: ungettext(u'%d hour', '%d hours', d) % d,
467 'minute': lambda d: ungettext(u'%d minute', '%d minutes', d) % d,
468 'second': lambda d: ungettext(u'%d second', '%d seconds', d) % d,
463 'year': lambda d: _pluralize(u'${num} year', u'${num} years', d, mapping={'num': d}).interpolate(),
464 'month': lambda d: _pluralize(u'${num} month', u'${num} months', d, mapping={'num': d}).interpolate(),
465 'day': lambda d: _pluralize(u'${num} day', u'${num} days', d, mapping={'num': d}).interpolate(),
466 'hour': lambda d: _pluralize(u'${num} hour', u'${num} hours', d, mapping={'num': d}).interpolate(),
467 'minute': lambda d: _pluralize(u'${num} minute', u'${num} minutes', d, mapping={'num': d}).interpolate(),
468 'second': lambda d: _pluralize(u'${num} second', u'${num} seconds', d, mapping={'num': d}).interpolate(),
469 469 }
470 470
471 471 i = 0
472 472 for part in order:
473 473 value = deltas[part]
474 474 if value != 0:
475 475
476 476 if i < 5:
477 477 sub_part = order[i + 1]
478 478 sub_value = deltas[sub_part]
479 479 else:
480 480 sub_value = 0
481 481
482 482 if sub_value == 0 or show_short_version:
483 483 _val = fmt_funcs[part](value)
484 484 if future:
485 485 if show_suffix:
486 return _(u'in %s') % _val
486 return _(u'in ${ago}', mapping={'ago': _val})
487 487 else:
488 return _val
488 return _(_val)
489 489
490 490 else:
491 491 if show_suffix:
492 return _(u'%s ago') % _val
492 return _(u'${ago} ago', mapping={'ago': _val})
493 493 else:
494 return _val
494 return _(_val)
495 495
496 496 val = fmt_funcs[part](value)
497 497 val_detail = fmt_funcs[sub_part](sub_value)
498 mapping = {'val': val, 'detail': val_detail}
498 499
499 500 if short_format:
500 datetime_tmpl = u'%s, %s'
501 datetime_tmpl = _(u'${val}, ${detail}', mapping=mapping)
501 502 if show_suffix:
502 datetime_tmpl = _(u'%s, %s ago')
503 datetime_tmpl = _(u'${val}, ${detail} ago', mapping=mapping)
503 504 if future:
504 datetime_tmpl = _(u'in %s, %s')
505 datetime_tmpl = _(u'in ${val}, ${detail}', mapping=mapping)
505 506 else:
506 datetime_tmpl = _(u'%s and %s')
507 datetime_tmpl = _(u'${val} and ${detail}', mapping=mapping)
507 508 if show_suffix:
508 datetime_tmpl = _(u'%s and %s ago')
509 datetime_tmpl = _(u'${val} and ${detail} ago', mapping=mapping)
509 510 if future:
510 datetime_tmpl = _(u'in %s and %s')
511 datetime_tmpl = _(u'in ${val} and ${detail}', mapping=mapping)
511 512
512 return datetime_tmpl % (val, val_detail)
513 return datetime_tmpl
513 514 i += 1
514 515 return _(u'just now')
515 516
516 517
517 518 def uri_filter(uri):
518 519 """
519 520 Removes user:password from given url string
520 521
521 522 :param uri:
522 523 :rtype: unicode
523 524 :returns: filtered list of strings
524 525 """
525 526 if not uri:
526 527 return ''
527 528
528 529 proto = ''
529 530
530 531 for pat in ('https://', 'http://'):
531 532 if uri.startswith(pat):
532 533 uri = uri[len(pat):]
533 534 proto = pat
534 535 break
535 536
536 537 # remove passwords and username
537 538 uri = uri[uri.find('@') + 1:]
538 539
539 540 # get the port
540 541 cred_pos = uri.find(':')
541 542 if cred_pos == -1:
542 543 host, port = uri, None
543 544 else:
544 545 host, port = uri[:cred_pos], uri[cred_pos + 1:]
545 546
546 547 return filter(None, [proto, host, port])
547 548
548 549
549 550 def credentials_filter(uri):
550 551 """
551 552 Returns a url with removed credentials
552 553
553 554 :param uri:
554 555 """
555 556
556 557 uri = uri_filter(uri)
557 558 # check if we have port
558 559 if len(uri) > 2 and uri[2]:
559 560 uri[2] = ':' + uri[2]
560 561
561 562 return ''.join(uri)
562 563
563 564
564 565 def get_clone_url(uri_tmpl, qualifed_home_url, repo_name, repo_id, **override):
565 566 parsed_url = urlobject.URLObject(qualifed_home_url)
566 567 decoded_path = safe_unicode(urllib.unquote(parsed_url.path.rstrip('/')))
567 568 args = {
568 569 'scheme': parsed_url.scheme,
569 570 'user': '',
570 571 # path if we use proxy-prefix
571 572 'netloc': parsed_url.netloc+decoded_path,
572 573 'prefix': decoded_path,
573 574 'repo': repo_name,
574 575 'repoid': str(repo_id)
575 576 }
576 577 args.update(override)
577 578 args['user'] = urllib.quote(safe_str(args['user']))
578 579
579 580 for k, v in args.items():
580 581 uri_tmpl = uri_tmpl.replace('{%s}' % k, v)
581 582
582 583 # remove leading @ sign if it's present. Case of empty user
583 584 url_obj = urlobject.URLObject(uri_tmpl)
584 585 url = url_obj.with_netloc(url_obj.netloc.lstrip('@'))
585 586
586 587 return safe_unicode(url)
587 588
588 589
589 590 def get_commit_safe(repo, commit_id=None, commit_idx=None, pre_load=None):
590 591 """
591 592 Safe version of get_commit if this commit doesn't exists for a
592 593 repository it returns a Dummy one instead
593 594
594 595 :param repo: repository instance
595 596 :param commit_id: commit id as str
596 597 :param pre_load: optional list of commit attributes to load
597 598 """
598 599 # TODO(skreft): remove these circular imports
599 600 from rhodecode.lib.vcs.backends.base import BaseRepository, EmptyCommit
600 601 from rhodecode.lib.vcs.exceptions import RepositoryError
601 602 if not isinstance(repo, BaseRepository):
602 603 raise Exception('You must pass an Repository '
603 604 'object as first argument got %s', type(repo))
604 605
605 606 try:
606 607 commit = repo.get_commit(
607 608 commit_id=commit_id, commit_idx=commit_idx, pre_load=pre_load)
608 609 except (RepositoryError, LookupError):
609 610 commit = EmptyCommit()
610 611 return commit
611 612
612 613
613 614 def datetime_to_time(dt):
614 615 if dt:
615 616 return time.mktime(dt.timetuple())
616 617
617 618
618 619 def time_to_datetime(tm):
619 620 if tm:
620 621 if isinstance(tm, basestring):
621 622 try:
622 623 tm = float(tm)
623 624 except ValueError:
624 625 return
625 626 return datetime.datetime.fromtimestamp(tm)
626 627
627 628
628 629 def time_to_utcdatetime(tm):
629 630 if tm:
630 631 if isinstance(tm, basestring):
631 632 try:
632 633 tm = float(tm)
633 634 except ValueError:
634 635 return
635 636 return datetime.datetime.utcfromtimestamp(tm)
636 637
637 638
638 639 MENTIONS_REGEX = re.compile(
639 640 # ^@ or @ without any special chars in front
640 641 r'(?:^@|[^a-zA-Z0-9\-\_\.]@)'
641 642 # main body starts with letter, then can be . - _
642 643 r'([a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+)',
643 644 re.VERBOSE | re.MULTILINE)
644 645
645 646
646 647 def extract_mentioned_users(s):
647 648 """
648 649 Returns unique usernames from given string s that have @mention
649 650
650 651 :param s: string to get mentions
651 652 """
652 653 usrs = set()
653 654 for username in MENTIONS_REGEX.findall(s):
654 655 usrs.add(username)
655 656
656 657 return sorted(list(usrs), key=lambda k: k.lower())
657 658
658 659
659 660 class StrictAttributeDict(dict):
660 661 """
661 662 Strict Version of Attribute dict which raises an Attribute error when
662 663 requested attribute is not set
663 664 """
664 665 def __getattr__(self, attr):
665 666 try:
666 667 return self[attr]
667 668 except KeyError:
668 669 raise AttributeError('%s object has no attribute %s' % (
669 670 self.__class__, attr))
670 671 __setattr__ = dict.__setitem__
671 672 __delattr__ = dict.__delitem__
672 673
673 674
674 675 class AttributeDict(dict):
675 676 def __getattr__(self, attr):
676 677 return self.get(attr, None)
677 678 __setattr__ = dict.__setitem__
678 679 __delattr__ = dict.__delitem__
679 680
680 681
681 682 def fix_PATH(os_=None):
682 683 """
683 684 Get current active python path, and append it to PATH variable to fix
684 685 issues of subprocess calls and different python versions
685 686 """
686 687 if os_ is None:
687 688 import os
688 689 else:
689 690 os = os_
690 691
691 692 cur_path = os.path.split(sys.executable)[0]
692 693 if not os.environ['PATH'].startswith(cur_path):
693 694 os.environ['PATH'] = '%s:%s' % (cur_path, os.environ['PATH'])
694 695
695 696
696 697 def obfuscate_url_pw(engine):
697 698 _url = engine or ''
698 699 try:
699 700 _url = sqlalchemy.engine.url.make_url(engine)
700 701 if _url.password:
701 702 _url.password = 'XXXXX'
702 703 except Exception:
703 704 pass
704 705 return unicode(_url)
705 706
706 707
707 708 def get_server_url(environ):
708 709 req = webob.Request(environ)
709 710 return req.host_url + req.script_name
710 711
711 712
712 713 def unique_id(hexlen=32):
713 714 alphabet = "23456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghjklmnpqrstuvwxyz"
714 715 return suuid(truncate_to=hexlen, alphabet=alphabet)
715 716
716 717
717 718 def suuid(url=None, truncate_to=22, alphabet=None):
718 719 """
719 720 Generate and return a short URL safe UUID.
720 721
721 722 If the url parameter is provided, set the namespace to the provided
722 723 URL and generate a UUID.
723 724
724 725 :param url to get the uuid for
725 726 :truncate_to: truncate the basic 22 UUID to shorter version
726 727
727 728 The IDs won't be universally unique any longer, but the probability of
728 729 a collision will still be very low.
729 730 """
730 731 # Define our alphabet.
731 732 _ALPHABET = alphabet or "23456789ABCDEFGHJKLMNPQRSTUVWXYZ"
732 733
733 734 # If no URL is given, generate a random UUID.
734 735 if url is None:
735 736 unique_id = uuid.uuid4().int
736 737 else:
737 738 unique_id = uuid.uuid3(uuid.NAMESPACE_URL, url).int
738 739
739 740 alphabet_length = len(_ALPHABET)
740 741 output = []
741 742 while unique_id > 0:
742 743 digit = unique_id % alphabet_length
743 744 output.append(_ALPHABET[digit])
744 745 unique_id = int(unique_id / alphabet_length)
745 746 return "".join(output)[:truncate_to]
746 747
747 748
748 749 def get_current_rhodecode_user():
749 750 """
750 751 Gets rhodecode user from threadlocal tmpl_context variable if it's
751 752 defined, else returns None.
752 753 """
753 754 from pylons import tmpl_context as c
754 755 if hasattr(c, 'rhodecode_user'):
755 756 return c.rhodecode_user
756 757
757 758 return None
758 759
759 760
760 761 def action_logger_generic(action, namespace=''):
761 762 """
762 763 A generic logger for actions useful to the system overview, tries to find
763 764 an acting user for the context of the call otherwise reports unknown user
764 765
765 766 :param action: logging message eg 'comment 5 deleted'
766 767 :param type: string
767 768
768 769 :param namespace: namespace of the logging message eg. 'repo.comments'
769 770 :param type: string
770 771
771 772 """
772 773
773 774 logger_name = 'rhodecode.actions'
774 775
775 776 if namespace:
776 777 logger_name += '.' + namespace
777 778
778 779 log = logging.getLogger(logger_name)
779 780
780 781 # get a user if we can
781 782 user = get_current_rhodecode_user()
782 783
783 784 logfunc = log.info
784 785
785 786 if not user:
786 787 user = '<unknown user>'
787 788 logfunc = log.warning
788 789
789 790 logfunc('Logging action by {}: {}'.format(user, action))
790 791
791 792
792 793 def escape_split(text, sep=',', maxsplit=-1):
793 794 r"""
794 795 Allows for escaping of the separator: e.g. arg='foo\, bar'
795 796
796 797 It should be noted that the way bash et. al. do command line parsing, those
797 798 single quotes are required.
798 799 """
799 800 escaped_sep = r'\%s' % sep
800 801
801 802 if escaped_sep not in text:
802 803 return text.split(sep, maxsplit)
803 804
804 805 before, _mid, after = text.partition(escaped_sep)
805 806 startlist = before.split(sep, maxsplit) # a regular split is fine here
806 807 unfinished = startlist[-1]
807 808 startlist = startlist[:-1]
808 809
809 810 # recurse because there may be more escaped separators
810 811 endlist = escape_split(after, sep, maxsplit)
811 812
812 813 # finish building the escaped value. we use endlist[0] becaue the first
813 814 # part of the string sent in recursion is the rest of the escaped value.
814 815 unfinished += sep + endlist[0]
815 816
816 817 return startlist + [unfinished] + endlist[1:] # put together all the parts
817 818
818 819
819 820 class OptionalAttr(object):
820 821 """
821 822 Special Optional Option that defines other attribute. Example::
822 823
823 824 def test(apiuser, userid=Optional(OAttr('apiuser')):
824 825 user = Optional.extract(userid)
825 826 # calls
826 827
827 828 """
828 829
829 830 def __init__(self, attr_name):
830 831 self.attr_name = attr_name
831 832
832 833 def __repr__(self):
833 834 return '<OptionalAttr:%s>' % self.attr_name
834 835
835 836 def __call__(self):
836 837 return self
837 838
838 839
839 840 # alias
840 841 OAttr = OptionalAttr
841 842
842 843
843 844 class Optional(object):
844 845 """
845 846 Defines an optional parameter::
846 847
847 848 param = param.getval() if isinstance(param, Optional) else param
848 849 param = param() if isinstance(param, Optional) else param
849 850
850 851 is equivalent of::
851 852
852 853 param = Optional.extract(param)
853 854
854 855 """
855 856
856 857 def __init__(self, type_):
857 858 self.type_ = type_
858 859
859 860 def __repr__(self):
860 861 return '<Optional:%s>' % self.type_.__repr__()
861 862
862 863 def __call__(self):
863 864 return self.getval()
864 865
865 866 def getval(self):
866 867 """
867 868 returns value from this Optional instance
868 869 """
869 870 if isinstance(self.type_, OAttr):
870 871 # use params name
871 872 return self.type_.attr_name
872 873 return self.type_
873 874
874 875 @classmethod
875 876 def extract(cls, val):
876 877 """
877 878 Extracts value from Optional() instance
878 879
879 880 :param val:
880 881 :return: original value if it's not Optional instance else
881 882 value of instance
882 883 """
883 884 if isinstance(val, cls):
884 885 return val.getval()
885 886 return val
886 887
887 888
888 889 def get_routes_generator_for_server_url(server_url):
889 890 parsed_url = urlobject.URLObject(server_url)
890 891 netloc = safe_str(parsed_url.netloc)
891 892 script_name = safe_str(parsed_url.path)
892 893
893 894 if ':' in netloc:
894 895 server_name, server_port = netloc.split(':')
895 896 else:
896 897 server_name = netloc
897 898 server_port = (parsed_url.scheme == 'https' and '443' or '80')
898 899
899 900 environ = {
900 901 'REQUEST_METHOD': 'GET',
901 902 'PATH_INFO': '/',
902 903 'SERVER_NAME': server_name,
903 904 'SERVER_PORT': server_port,
904 905 'SCRIPT_NAME': script_name,
905 906 }
906 907 if parsed_url.scheme == 'https':
907 908 environ['HTTPS'] = 'on'
908 909 environ['wsgi.url_scheme'] = 'https'
909 910
910 911 return routes.util.URLGenerator(rhodecode.CONFIG['routes.map'], environ)
911 912
912 913
913 914 def glob2re(pat):
914 915 """
915 916 Translate a shell PATTERN to a regular expression.
916 917
917 918 There is no way to quote meta-characters.
918 919 """
919 920
920 921 i, n = 0, len(pat)
921 922 res = ''
922 923 while i < n:
923 924 c = pat[i]
924 925 i = i+1
925 926 if c == '*':
926 927 #res = res + '.*'
927 928 res = res + '[^/]*'
928 929 elif c == '?':
929 930 #res = res + '.'
930 931 res = res + '[^/]'
931 932 elif c == '[':
932 933 j = i
933 934 if j < n and pat[j] == '!':
934 935 j = j+1
935 936 if j < n and pat[j] == ']':
936 937 j = j+1
937 938 while j < n and pat[j] != ']':
938 939 j = j+1
939 940 if j >= n:
940 941 res = res + '\\['
941 942 else:
942 943 stuff = pat[i:j].replace('\\','\\\\')
943 944 i = j+1
944 945 if stuff[0] == '!':
945 946 stuff = '^' + stuff[1:]
946 947 elif stuff[0] == '^':
947 948 stuff = '\\' + stuff
948 949 res = '%s[%s]' % (res, stuff)
949 950 else:
950 951 res = res + re.escape(c)
951 952 return res + '\Z(?ms)'
@@ -1,518 +1,527 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21
22 22 """
23 23 Package for testing various lib/helper functions in rhodecode
24 24 """
25 25
26 26 import datetime
27 27 import string
28 28 import mock
29 29 import pytest
30 30 from rhodecode.tests.utils import run_test_concurrently
31 31 from rhodecode.lib.helpers import InitialsGravatar
32 32
33 33 from rhodecode.lib.utils2 import AttributeDict
34 34 from rhodecode.model.db import Repository
35 35
36 36
37 37 def _urls_for_proto(proto):
38 38 return [
39 39 ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
40 40 '%s://127.0.0.1' % proto),
41 41 ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
42 42 '%s://127.0.0.1' % proto),
43 43 ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
44 44 '%s://127.0.0.1' % proto),
45 45 ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
46 46 '%s://127.0.0.1:8080' % proto),
47 47 ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
48 48 '%s://domain.org' % proto),
49 49 ('%s://user:pass@domain.org:8080' % proto,
50 50 ['%s://' % proto, 'domain.org', '8080'],
51 51 '%s://domain.org:8080' % proto),
52 52 ]
53 53
54 54 TEST_URLS = _urls_for_proto('http') + _urls_for_proto('https')
55 55
56 56
57 57 @pytest.mark.parametrize("test_url, expected, expected_creds", TEST_URLS)
58 58 def test_uri_filter(test_url, expected, expected_creds):
59 59 from rhodecode.lib.utils2 import uri_filter
60 60 assert uri_filter(test_url) == expected
61 61
62 62
63 63 @pytest.mark.parametrize("test_url, expected, expected_creds", TEST_URLS)
64 64 def test_credentials_filter(test_url, expected, expected_creds):
65 65 from rhodecode.lib.utils2 import credentials_filter
66 66 assert credentials_filter(test_url) == expected_creds
67 67
68 68
69 69 @pytest.mark.parametrize("str_bool, expected", [
70 70 ('t', True),
71 71 ('true', True),
72 72 ('y', True),
73 73 ('yes', True),
74 74 ('on', True),
75 75 ('1', True),
76 76 ('Y', True),
77 77 ('yeS', True),
78 78 ('Y', True),
79 79 ('TRUE', True),
80 80 ('T', True),
81 81 ('False', False),
82 82 ('F', False),
83 83 ('FALSE', False),
84 84 ('0', False),
85 85 ('-1', False),
86 86 ('', False)
87 87 ])
88 88 def test_str2bool(str_bool, expected):
89 89 from rhodecode.lib.utils2 import str2bool
90 90 assert str2bool(str_bool) == expected
91 91
92 92
93 93 @pytest.mark.parametrize("text, expected", reduce(lambda a1,a2:a1+a2, [
94 94 [
95 95 (pref+"", []),
96 96 (pref+"Hi there @marcink", ['marcink']),
97 97 (pref+"Hi there @marcink and @bob", ['bob', 'marcink']),
98 98 (pref+"Hi there @marcink\n", ['marcink']),
99 99 (pref+"Hi there @marcink and @bob\n", ['bob', 'marcink']),
100 100 (pref+"Hi there marcin@rhodecode.com", []),
101 101 (pref+"Hi there @john.malcovic and @bob\n", ['bob', 'john.malcovic']),
102 102 (pref+"This needs to be reviewed: (@marcink,@john)", ["john", "marcink"]),
103 103 (pref+"This needs to be reviewed: (@marcink, @john)", ["john", "marcink"]),
104 104 (pref+"This needs to be reviewed: [@marcink,@john]", ["john", "marcink"]),
105 105 (pref+"This needs to be reviewed: (@marcink @john)", ["john", "marcink"]),
106 106 (pref+"@john @mary, please review", ["john", "mary"]),
107 107 (pref+"@john,@mary, please review", ["john", "mary"]),
108 108 (pref+"Hej @123, @22john,@mary, please review", ['123', '22john', 'mary']),
109 109 (pref+"@first hi there @marcink here's my email marcin@email.com "
110 110 "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three ", ['first', 'lukaszb', 'marcink', 'one', 'one_more22']),
111 111 (pref+"@MARCIN @maRCiN @2one_more22 @john please see this http://org.pl", ['2one_more22', 'john', 'MARCIN', 'maRCiN']),
112 112 (pref+"@marian.user just do it @marco-polo and next extract @marco_polo", ['marco-polo', 'marco_polo', 'marian.user']),
113 113 (pref+"user.dot hej ! not-needed maril@domain.org", []),
114 114 (pref+"\n@marcin", ['marcin']),
115 115 ]
116 116 for pref in ['', '\n', 'hi !', '\t', '\n\n']]))
117 117 def test_mention_extractor(text, expected):
118 118 from rhodecode.lib.utils2 import extract_mentioned_users
119 119 got = extract_mentioned_users(text)
120 120 assert sorted(got, key=lambda x: x.lower()) == got
121 121 assert set(expected) == set(got)
122 122
123 123 @pytest.mark.parametrize("age_args, expected, kw", [
124 124 ({}, u'just now', {}),
125 125 ({'seconds': -1}, u'1 second ago', {}),
126 126 ({'seconds': -60 * 2}, u'2 minutes ago', {}),
127 127 ({'hours': -1}, u'1 hour ago', {}),
128 128 ({'hours': -24}, u'1 day ago', {}),
129 129 ({'hours': -24 * 5}, u'5 days ago', {}),
130 130 ({'months': -1}, u'1 month ago', {}),
131 131 ({'months': -1, 'days': -2}, u'1 month and 2 days ago', {}),
132 132 ({'years': -1, 'months': -1}, u'1 year and 1 month ago', {}),
133 133 ({}, u'just now', {'short_format': True}),
134 134 ({'seconds': -1}, u'1sec ago', {'short_format': True}),
135 135 ({'seconds': -60 * 2}, u'2min ago', {'short_format': True}),
136 136 ({'hours': -1}, u'1h ago', {'short_format': True}),
137 137 ({'hours': -24}, u'1d ago', {'short_format': True}),
138 138 ({'hours': -24 * 5}, u'5d ago', {'short_format': True}),
139 139 ({'months': -1}, u'1m ago', {'short_format': True}),
140 140 ({'months': -1, 'days': -2}, u'1m, 2d ago', {'short_format': True}),
141 141 ({'years': -1, 'months': -1}, u'1y, 1m ago', {'short_format': True}),
142 142 ])
143 143 def test_age(age_args, expected, kw, pylonsapp):
144 144 from rhodecode.lib.utils2 import age
145 145 from dateutil import relativedelta
146 146 n = datetime.datetime(year=2012, month=5, day=17)
147 147 delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
148 assert age(n + delt(**age_args), now=n, **kw) == expected
148
149 def translate(elem):
150 return elem.interpolate()
151
152 assert translate(age(n + delt(**age_args), now=n, **kw)) == expected
153
149 154
150 155 @pytest.mark.parametrize("age_args, expected, kw", [
151 156 ({}, u'just now', {}),
152 157 ({'seconds': 1}, u'in 1 second', {}),
153 158 ({'seconds': 60 * 2}, u'in 2 minutes', {}),
154 159 ({'hours': 1}, u'in 1 hour', {}),
155 160 ({'hours': 24}, u'in 1 day', {}),
156 161 ({'hours': 24 * 5}, u'in 5 days', {}),
157 162 ({'months': 1}, u'in 1 month', {}),
158 163 ({'months': 1, 'days': 1}, u'in 1 month and 1 day', {}),
159 164 ({'years': 1, 'months': 1}, u'in 1 year and 1 month', {}),
160 165 ({}, u'just now', {'short_format': True}),
161 166 ({'seconds': 1}, u'in 1sec', {'short_format': True}),
162 167 ({'seconds': 60 * 2}, u'in 2min', {'short_format': True}),
163 168 ({'hours': 1}, u'in 1h', {'short_format': True}),
164 169 ({'hours': 24}, u'in 1d', {'short_format': True}),
165 170 ({'hours': 24 * 5}, u'in 5d', {'short_format': True}),
166 171 ({'months': 1}, u'in 1m', {'short_format': True}),
167 172 ({'months': 1, 'days': 1}, u'in 1m, 1d', {'short_format': True}),
168 173 ({'years': 1, 'months': 1}, u'in 1y, 1m', {'short_format': True}),
169 174 ])
170 175 def test_age_in_future(age_args, expected, kw, pylonsapp):
171 176 from rhodecode.lib.utils2 import age
172 177 from dateutil import relativedelta
173 178 n = datetime.datetime(year=2012, month=5, day=17)
174 179 delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
175 assert age(n + delt(**age_args), now=n, **kw) == expected
180
181 def translate(elem):
182 return elem.interpolate()
183
184 assert translate(age(n + delt(**age_args), now=n, **kw)) == expected
176 185
177 186
178 187 def test_tag_exctrator():
179 188 sample = (
180 189 "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
181 190 "[requires] [stale] [see<>=>] [see => http://url.com]"
182 191 "[requires => url] [lang => python] [just a tag] <html_tag first='abc' attr=\"my.url?attr=&another=\"></html_tag>"
183 192 "[,d] [ => ULR ] [obsolete] [desc]]"
184 193 )
185 194 from rhodecode.lib.helpers import desc_stylize, escaped_stylize
186 195 res = desc_stylize(sample)
187 196 assert '<div class="metatag" tag="tag">tag</div>' in res
188 197 assert '<div class="metatag" tag="obsolete">obsolete</div>' in res
189 198 assert '<div class="metatag" tag="stale">stale</div>' in res
190 199 assert '<div class="metatag" tag="lang">python</div>' in res
191 200 assert '<div class="metatag" tag="requires">requires =&gt; <a href="/url">url</a></div>' in res
192 201 assert '<div class="metatag" tag="tag">tag</div>' in res
193 202 assert '<html_tag first=\'abc\' attr=\"my.url?attr=&another=\"></html_tag>' in res
194 203
195 204 res_encoded = escaped_stylize(sample)
196 205 assert '<div class="metatag" tag="tag">tag</div>' in res_encoded
197 206 assert '<div class="metatag" tag="obsolete">obsolete</div>' in res_encoded
198 207 assert '<div class="metatag" tag="stale">stale</div>' in res_encoded
199 208 assert '<div class="metatag" tag="lang">python</div>' in res_encoded
200 209 assert '<div class="metatag" tag="requires">requires =&gt; <a href="/url">url</a></div>' in res_encoded
201 210 assert '<div class="metatag" tag="tag">tag</div>' in res_encoded
202 211 assert '&lt;html_tag first=&#39;abc&#39; attr=&#34;my.url?attr=&amp;another=&#34;&gt;&lt;/html_tag&gt;' in res_encoded
203 212
204 213
205 214 @pytest.mark.parametrize("tmpl_url, email, expected", [
206 215 ('http://test.com/{email}', 'test@foo.com', 'http://test.com/test@foo.com'),
207 216
208 217 ('http://test.com/{md5email}', 'test@foo.com', 'http://test.com/3cb7232fcc48743000cb86d0d5022bd9'),
209 218 ('http://test.com/{md5email}', 'testΔ…Δ‡@foo.com', 'http://test.com/978debb907a3c55cd741872ab293ef30'),
210 219
211 220 ('http://testX.com/{md5email}?s={size}', 'test@foo.com', 'http://testX.com/3cb7232fcc48743000cb86d0d5022bd9?s=24'),
212 221 ('http://testX.com/{md5email}?s={size}', 'testΔ…Δ‡@foo.com', 'http://testX.com/978debb907a3c55cd741872ab293ef30?s=24'),
213 222
214 223 ('{scheme}://{netloc}/{md5email}/{size}', 'test@foo.com', 'https://server.com/3cb7232fcc48743000cb86d0d5022bd9/24'),
215 224 ('{scheme}://{netloc}/{md5email}/{size}', 'testΔ…Δ‡@foo.com', 'https://server.com/978debb907a3c55cd741872ab293ef30/24'),
216 225
217 226 ('http://test.com/{email}', 'testΔ…Δ‡@foo.com', 'http://test.com/testΔ…Δ‡@foo.com'),
218 227 ('http://test.com/{email}?size={size}', 'test@foo.com', 'http://test.com/test@foo.com?size=24'),
219 228 ('http://test.com/{email}?size={size}', 'testΔ…Δ‡@foo.com', 'http://test.com/testΔ…Δ‡@foo.com?size=24'),
220 229 ])
221 230 def test_gravatar_url_builder(tmpl_url, email, expected, request_stub):
222 231 from rhodecode.lib.helpers import gravatar_url
223 232
224 233 # mock pyramid.threadlocals
225 234 def fake_get_current_request():
226 235 request_stub.scheme = 'https'
227 236 request_stub.host = 'server.com'
228 237 return request_stub
229 238
230 239 # mock pylons.tmpl_context
231 240 def fake_tmpl_context(_url):
232 241 _c = AttributeDict()
233 242 _c.visual = AttributeDict()
234 243 _c.visual.use_gravatar = True
235 244 _c.visual.gravatar_url = _url
236 245
237 246 return _c
238 247
239 248 with mock.patch('rhodecode.lib.helpers.get_current_request',
240 249 fake_get_current_request):
241 250 fake = fake_tmpl_context(_url=tmpl_url)
242 251 with mock.patch('pylons.tmpl_context', fake):
243 252 grav = gravatar_url(email_address=email, size=24)
244 253 assert grav == expected
245 254
246 255
247 256 @pytest.mark.parametrize(
248 257 "email, first_name, last_name, expected_initials, expected_color", [
249 258
250 259 ('test@rhodecode.com', '', '', 'TR', '#8a994d'),
251 260 ('marcin.kuzminski@rhodecode.com', '', '', 'MK', '#6559b3'),
252 261 # special cases of email
253 262 ('john.van.dam@rhodecode.com', '', '', 'JD', '#526600'),
254 263 ('Guido.van.Rossum@rhodecode.com', '', '', 'GR', '#990052'),
255 264 ('Guido.van.Rossum@rhodecode.com', 'Guido', 'Van Rossum', 'GR', '#990052'),
256 265
257 266 ('rhodecode+Guido.van.Rossum@rhodecode.com', '', '', 'RR', '#46598c'),
258 267 ('pclouds@rhodecode.com', 'Nguyα»…n ThΓ‘i', 'Tgọc Duy', 'ND', '#665200'),
259 268
260 269 ('john-brown@foo.com', '', '', 'JF', '#73006b'),
261 270 ('admin@rhodecode.com', 'Marcin', 'Kuzminski', 'MK', '#104036'),
262 271 # partials
263 272 ('admin@rhodecode.com', 'Marcin', '', 'MR', '#104036'), # fn+email
264 273 ('admin@rhodecode.com', '', 'Kuzminski', 'AK', '#104036'), # em+ln
265 274 # non-ascii
266 275 ('admin@rhodecode.com', 'Marcin', 'Śuzminski', 'MS', '#104036'),
267 276 ('marcin.Ε›uzminski@rhodecode.com', '', '', 'MS', '#73000f'),
268 277
269 278 # special cases, LDAP can provide those...
270 279 ('admin@', 'Marcin', 'Śuzminski', 'MS', '#aa00ff'),
271 280 ('marcin.Ε›uzminski', '', '', 'MS', '#402020'),
272 281 ('null', '', '', 'NL', '#8c4646'),
273 282 ])
274 283 def test_initials_gravatar_pick_of_initials_and_color_algo(
275 284 email, first_name, last_name, expected_initials, expected_color):
276 285 instance = InitialsGravatar(email, first_name, last_name)
277 286 assert instance.get_initials() == expected_initials
278 287 assert instance.str2color(email) == expected_color
279 288
280 289
281 290 def test_initials_gravatar_mapping_algo():
282 291 pos = set()
283 292 instance = InitialsGravatar('', '', '')
284 293 iterations = 0
285 294
286 295 variations = []
287 296 for letter1 in string.ascii_letters:
288 297 for letter2 in string.ascii_letters[::-1][:10]:
289 298 for letter3 in string.ascii_letters[:10]:
290 299 variations.append(
291 300 '%s@rhodecode.com' % (letter1+letter2+letter3))
292 301
293 302 max_variations = 4096
294 303 for email in variations[:max_variations]:
295 304 iterations += 1
296 305 pos.add(
297 306 instance.pick_color_bank_index(email,
298 307 instance.get_color_bank()))
299 308
300 309 # we assume that we have match all 256 possible positions,
301 310 # in reasonable amount of different email addresses
302 311 assert len(pos) == 256
303 312 assert iterations == max_variations
304 313
305 314
306 315 @pytest.mark.parametrize("tmpl, repo_name, overrides, prefix, expected", [
307 316 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {}, '', 'http://vps1:8000/group/repo1'),
308 317 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'marcink'}, '', 'http://marcink@vps1:8000/group/repo1'),
309 318 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {}, '/rc', 'http://vps1:8000/rc/group/repo1'),
310 319 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'user'}, '/rc', 'http://user@vps1:8000/rc/group/repo1'),
311 320 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'marcink'}, '/rc', 'http://marcink@vps1:8000/rc/group/repo1'),
312 321 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'user'}, '/rc/', 'http://user@vps1:8000/rc/group/repo1'),
313 322 (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'user': 'marcink'}, '/rc/', 'http://marcink@vps1:8000/rc/group/repo1'),
314 323 ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', {}, '', 'http://vps1:8000/_23'),
315 324 ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', {'user': 'marcink'}, '', 'http://marcink@vps1:8000/_23'),
316 325 ('http://{user}@{netloc}/_{repoid}', 'group/repo1', {'user': 'marcink'}, '', 'http://marcink@vps1:8000/_23'),
317 326 ('http://{netloc}/_{repoid}', 'group/repo1', {'user': 'marcink'}, '', 'http://vps1:8000/_23'),
318 327 ('https://{user}@proxy1.server.com/{repo}', 'group/repo1', {'user': 'marcink'}, '', 'https://marcink@proxy1.server.com/group/repo1'),
319 328 ('https://{user}@proxy1.server.com/{repo}', 'group/repo1', {}, '', 'https://proxy1.server.com/group/repo1'),
320 329 ('https://proxy1.server.com/{user}/{repo}', 'group/repo1', {'user': 'marcink'}, '', 'https://proxy1.server.com/marcink/group/repo1'),
321 330 ])
322 331 def test_clone_url_generator(tmpl, repo_name, overrides, prefix, expected):
323 332 from rhodecode.lib.utils2 import get_clone_url
324 333 clone_url = get_clone_url(uri_tmpl=tmpl, qualifed_home_url='http://vps1:8000'+prefix,
325 334 repo_name=repo_name, repo_id=23, **overrides)
326 335 assert clone_url == expected
327 336
328 337
329 338 def _quick_url(text, tmpl="""<a class="revision-link" href="%s">%s</a>""", url_=None):
330 339 """
331 340 Changes `some text url[foo]` => `some text <a href="/">foo</a>
332 341
333 342 :param text:
334 343 """
335 344 import re
336 345 # quickly change expected url[] into a link
337 346 URL_PAT = re.compile(r'(?:url\[)(.+?)(?:\])')
338 347
339 348 def url_func(match_obj):
340 349 _url = match_obj.groups()[0]
341 350 return tmpl % (url_ or '/some-url', _url)
342 351 return URL_PAT.sub(url_func, text)
343 352
344 353
345 354 @pytest.mark.parametrize("sample, expected", [
346 355 ("",
347 356 ""),
348 357 ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68",
349 358 "git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68"),
350 359 ("from rev 000000000000",
351 360 "from rev url[000000000000]"),
352 361 ("from rev 000000000000123123 also rev 000000000000",
353 362 "from rev url[000000000000123123] also rev url[000000000000]"),
354 363 ("this should-000 00",
355 364 "this should-000 00"),
356 365 ("longtextffffffffff rev 123123123123",
357 366 "longtextffffffffff rev url[123123123123]"),
358 367 ("rev ffffffffffffffffffffffffffffffffffffffffffffffffff",
359 368 "rev ffffffffffffffffffffffffffffffffffffffffffffffffff"),
360 369 ("ffffffffffff some text traalaa",
361 370 "url[ffffffffffff] some text traalaa"),
362 371 ("""Multi line
363 372 123123123123
364 373 some text 123123123123
365 374 sometimes !
366 375 """,
367 376 """Multi line
368 377 url[123123123123]
369 378 some text url[123123123123]
370 379 sometimes !
371 380 """)
372 381 ])
373 382 def test_urlify_commits(sample, expected):
374 383 def fake_url(self, *args, **kwargs):
375 384 return '/some-url'
376 385
377 386 expected = _quick_url(expected)
378 387
379 388 with mock.patch('pylons.url', fake_url):
380 389 from rhodecode.lib.helpers import urlify_commits
381 390 assert urlify_commits(sample, 'repo_name') == expected
382 391
383 392
384 393 @pytest.mark.parametrize("sample, expected, url_", [
385 394 ("",
386 395 "",
387 396 ""),
388 397 ("https://svn.apache.org/repos",
389 398 "url[https://svn.apache.org/repos]",
390 399 "https://svn.apache.org/repos"),
391 400 ("http://svn.apache.org/repos",
392 401 "url[http://svn.apache.org/repos]",
393 402 "http://svn.apache.org/repos"),
394 403 ("from rev a also rev http://google.com",
395 404 "from rev a also rev url[http://google.com]",
396 405 "http://google.com"),
397 406 ("""Multi line
398 407 https://foo.bar.com
399 408 some text lalala""",
400 409 """Multi line
401 410 url[https://foo.bar.com]
402 411 some text lalala""",
403 412 "https://foo.bar.com")
404 413 ])
405 414 def test_urlify_test(sample, expected, url_):
406 415 from rhodecode.lib.helpers import urlify_text
407 416 expected = _quick_url(expected, tmpl="""<a href="%s">%s</a>""", url_=url_)
408 417 assert urlify_text(sample) == expected
409 418
410 419
411 420 @pytest.mark.parametrize("test, expected", [
412 421 ("", None),
413 422 ("/_2", '2'),
414 423 ("_2", '2'),
415 424 ("/_2/", '2'),
416 425 ("_2/", '2'),
417 426
418 427 ("/_21", '21'),
419 428 ("_21", '21'),
420 429 ("/_21/", '21'),
421 430 ("_21/", '21'),
422 431
423 432 ("/_21/foobar", '21'),
424 433 ("_21/121", '21'),
425 434 ("/_21/_12", '21'),
426 435 ("_21/rc/foo", '21'),
427 436
428 437 ])
429 438 def test_get_repo_by_id(test, expected):
430 439 from rhodecode.model.repo import RepoModel
431 440 _test = RepoModel()._extract_id_from_repo_name(test)
432 441 assert _test == expected
433 442
434 443
435 444 @pytest.mark.parametrize("test_repo_name, repo_type", [
436 445 ("test_repo_1", None),
437 446 ("repo_group/foobar", None),
438 447 ("test_non_asci_Δ…Δ‡Δ™", None),
439 448 (u"test_non_asci_unicode_Δ…Δ‡Δ™", None),
440 449 ])
441 450 def test_invalidation_context(pylonsapp, test_repo_name, repo_type):
442 451 from beaker.cache import cache_region
443 452 from rhodecode.lib import caches
444 453 from rhodecode.model.db import CacheKey
445 454
446 455 @cache_region('long_term')
447 456 def _dummy_func(cache_key):
448 457 return 'result'
449 458
450 459 invalidator_context = CacheKey.repo_context_cache(
451 460 _dummy_func, test_repo_name, 'repo')
452 461
453 462 with invalidator_context as context:
454 463 invalidated = context.invalidate()
455 464 result = context.compute()
456 465
457 466 assert invalidated == True
458 467 assert 'result' == result
459 468 assert isinstance(context, caches.FreshRegionCache)
460 469
461 470 assert 'InvalidationContext' in repr(invalidator_context)
462 471
463 472 with invalidator_context as context:
464 473 context.invalidate()
465 474 result = context.compute()
466 475
467 476 assert 'result' == result
468 477 assert isinstance(context, caches.ActiveRegionCache)
469 478
470 479
471 480 def test_invalidation_context_exception_in_compute(pylonsapp):
472 481 from rhodecode.model.db import CacheKey
473 482 from beaker.cache import cache_region
474 483
475 484 @cache_region('long_term')
476 485 def _dummy_func(cache_key):
477 486 # this causes error since it doesn't get any params
478 487 raise Exception('ups')
479 488
480 489 invalidator_context = CacheKey.repo_context_cache(
481 490 _dummy_func, 'test_repo_2', 'repo')
482 491
483 492 with pytest.raises(Exception):
484 493 with invalidator_context as context:
485 494 context.invalidate()
486 495 context.compute()
487 496
488 497
489 498 @pytest.mark.parametrize('execution_number', range(5))
490 499 def test_cache_invalidation_race_condition(execution_number, pylonsapp):
491 500 import time
492 501 from beaker.cache import cache_region
493 502 from rhodecode.model.db import CacheKey
494 503
495 504 if CacheKey.metadata.bind.url.get_backend_name() == "mysql":
496 505 reason = (
497 506 'Fails on MariaDB due to some locking issues. Investigation'
498 507 ' needed')
499 508 pytest.xfail(reason=reason)
500 509
501 510 @run_test_concurrently(25)
502 511 def test_create_and_delete_cache_keys():
503 512 time.sleep(0.2)
504 513
505 514 @cache_region('long_term')
506 515 def _dummy_func(cache_key):
507 516 return 'result'
508 517
509 518 invalidator_context = CacheKey.repo_context_cache(
510 519 _dummy_func, 'test_repo_1', 'repo')
511 520
512 521 with invalidator_context as context:
513 522 context.invalidate()
514 523 context.compute()
515 524
516 525 CacheKey.set_invalidate('test_repo_1', delete=True)
517 526
518 527 test_create_and_delete_cache_keys()
@@ -1,36 +1,43 b''
1 1 # Copyright (C) 2016-2017 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 from pyramid.i18n import TranslationStringFactory
20 20
21 21 # Create a translation string factory for the 'rhodecode' domain.
22 22 _ = TranslationStringFactory('rhodecode')
23 23
24 24
25 25 class LazyString(object):
26 26 def __init__(self, *args, **kw):
27 27 self.args = args
28 28 self.kw = kw
29 29
30 30 def __str__(self):
31 31 return _(*self.args, **self.kw)
32 32
33 33
34 34 def lazy_ugettext(*args, **kw):
35 35 """ Lazily evaluated version of _() """
36 36 return LazyString(*args, **kw)
37
38
39 def _pluralize(msgid1, msgid2, n, mapping=None):
40 if n == 1:
41 return _(msgid1, mapping=mapping)
42 else:
43 return _(msgid2, mapping=mapping)
General Comments 0
You need to be logged in to leave comments. Login now