##// END OF EJS Templates
- age tests cannot be dynamic, there are cases when age calculation...
marcink -
r3644:71860d07 beta
parent child Browse files
Show More
@@ -1,612 +1,609 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.utils
4 4 ~~~~~~~~~~~~~~~~~~~
5 5
6 6 Some simple helper functions
7 7
8 8 :created_on: Jan 5, 2011
9 9 :author: marcink
10 10 :copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25
26 26 import os
27 27 import re
28 28 import sys
29 29 import time
30 30 import datetime
31 31 import traceback
32 32 import webob
33 33
34 34 from pylons.i18n.translation import _, ungettext
35 35 from rhodecode.lib.vcs.utils.lazy import LazyProperty
36 36 from rhodecode.lib.compat import json
37 37
38 38
39 39 def __get_lem():
40 40 """
41 41 Get language extension map based on what's inside pygments lexers
42 42 """
43 43 from pygments import lexers
44 44 from string import lower
45 45 from collections import defaultdict
46 46
47 47 d = defaultdict(lambda: [])
48 48
49 49 def __clean(s):
50 50 s = s.lstrip('*')
51 51 s = s.lstrip('.')
52 52
53 53 if s.find('[') != -1:
54 54 exts = []
55 55 start, stop = s.find('['), s.find(']')
56 56
57 57 for suffix in s[start + 1:stop]:
58 58 exts.append(s[:s.find('[')] + suffix)
59 59 return map(lower, exts)
60 60 else:
61 61 return map(lower, [s])
62 62
63 63 for lx, t in sorted(lexers.LEXERS.items()):
64 64 m = map(__clean, t[-2])
65 65 if m:
66 66 m = reduce(lambda x, y: x + y, m)
67 67 for ext in m:
68 68 desc = lx.replace('Lexer', '')
69 69 d[ext].append(desc)
70 70
71 71 return dict(d)
72 72
73 73
74 74 def str2bool(_str):
75 75 """
76 76 returs True/False value from given string, it tries to translate the
77 77 string into boolean
78 78
79 79 :param _str: string value to translate into boolean
80 80 :rtype: boolean
81 81 :returns: boolean from given string
82 82 """
83 83 if _str is None:
84 84 return False
85 85 if _str in (True, False):
86 86 return _str
87 87 _str = str(_str).strip().lower()
88 88 return _str in ('t', 'true', 'y', 'yes', 'on', '1')
89 89
90 90
91 91 def aslist(obj, sep=None, strip=True):
92 92 """
93 93 Returns given string separated by sep as list
94 94
95 95 :param obj:
96 96 :param sep:
97 97 :param strip:
98 98 """
99 99 if isinstance(obj, (basestring)):
100 100 lst = obj.split(sep)
101 101 if strip:
102 102 lst = [v.strip() for v in lst]
103 103 return lst
104 104 elif isinstance(obj, (list, tuple)):
105 105 return obj
106 106 elif obj is None:
107 107 return []
108 108 else:
109 109 return [obj]
110 110
111 111
112 112 def convert_line_endings(line, mode):
113 113 """
114 114 Converts a given line "line end" accordingly to given mode
115 115
116 116 Available modes are::
117 117 0 - Unix
118 118 1 - Mac
119 119 2 - DOS
120 120
121 121 :param line: given line to convert
122 122 :param mode: mode to convert to
123 123 :rtype: str
124 124 :return: converted line according to mode
125 125 """
126 126 from string import replace
127 127
128 128 if mode == 0:
129 129 line = replace(line, '\r\n', '\n')
130 130 line = replace(line, '\r', '\n')
131 131 elif mode == 1:
132 132 line = replace(line, '\r\n', '\r')
133 133 line = replace(line, '\n', '\r')
134 134 elif mode == 2:
135 135 line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line)
136 136 return line
137 137
138 138
139 139 def detect_mode(line, default):
140 140 """
141 141 Detects line break for given line, if line break couldn't be found
142 142 given default value is returned
143 143
144 144 :param line: str line
145 145 :param default: default
146 146 :rtype: int
147 147 :return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS
148 148 """
149 149 if line.endswith('\r\n'):
150 150 return 2
151 151 elif line.endswith('\n'):
152 152 return 0
153 153 elif line.endswith('\r'):
154 154 return 1
155 155 else:
156 156 return default
157 157
158 158
159 159 def generate_api_key(username, salt=None):
160 160 """
161 161 Generates unique API key for given username, if salt is not given
162 162 it'll be generated from some random string
163 163
164 164 :param username: username as string
165 165 :param salt: salt to hash generate KEY
166 166 :rtype: str
167 167 :returns: sha1 hash from username+salt
168 168 """
169 169 from tempfile import _RandomNameSequence
170 170 import hashlib
171 171
172 172 if salt is None:
173 173 salt = _RandomNameSequence().next()
174 174
175 175 return hashlib.sha1(username + salt).hexdigest()
176 176
177 177
178 178 def safe_int(val, default=None):
179 179 """
180 180 Returns int() of val if val is not convertable to int use default
181 181 instead
182 182
183 183 :param val:
184 184 :param default:
185 185 """
186 186
187 187 try:
188 188 val = int(val)
189 189 except (ValueError, TypeError):
190 190 val = default
191 191
192 192 return val
193 193
194 194
195 195 def safe_unicode(str_, from_encoding=None):
196 196 """
197 197 safe unicode function. Does few trick to turn str_ into unicode
198 198
199 199 In case of UnicodeDecode error we try to return it with encoding detected
200 200 by chardet library if it fails fallback to unicode with errors replaced
201 201
202 202 :param str_: string to decode
203 203 :rtype: unicode
204 204 :returns: unicode object
205 205 """
206 206 if isinstance(str_, unicode):
207 207 return str_
208 208
209 209 if not from_encoding:
210 210 import rhodecode
211 211 DEFAULT_ENCODINGS = aslist(rhodecode.CONFIG.get('default_encoding',
212 212 'utf8'), sep=',')
213 213 from_encoding = DEFAULT_ENCODINGS
214 214
215 215 if not isinstance(from_encoding, (list, tuple)):
216 216 from_encoding = [from_encoding]
217 217
218 218 try:
219 219 return unicode(str_)
220 220 except UnicodeDecodeError:
221 221 pass
222 222
223 223 for enc in from_encoding:
224 224 try:
225 225 return unicode(str_, enc)
226 226 except UnicodeDecodeError:
227 227 pass
228 228
229 229 try:
230 230 import chardet
231 231 encoding = chardet.detect(str_)['encoding']
232 232 if encoding is None:
233 233 raise Exception()
234 234 return str_.decode(encoding)
235 235 except (ImportError, UnicodeDecodeError, Exception):
236 236 return unicode(str_, from_encoding[0], 'replace')
237 237
238 238
239 239 def safe_str(unicode_, to_encoding=None):
240 240 """
241 241 safe str function. Does few trick to turn unicode_ into string
242 242
243 243 In case of UnicodeEncodeError we try to return it with encoding detected
244 244 by chardet library if it fails fallback to string with errors replaced
245 245
246 246 :param unicode_: unicode to encode
247 247 :rtype: str
248 248 :returns: str object
249 249 """
250 250
251 251 # if it's not basestr cast to str
252 252 if not isinstance(unicode_, basestring):
253 253 return str(unicode_)
254 254
255 255 if isinstance(unicode_, str):
256 256 return unicode_
257 257
258 258 if not to_encoding:
259 259 import rhodecode
260 260 DEFAULT_ENCODINGS = aslist(rhodecode.CONFIG.get('default_encoding',
261 261 'utf8'), sep=',')
262 262 to_encoding = DEFAULT_ENCODINGS
263 263
264 264 if not isinstance(to_encoding, (list, tuple)):
265 265 to_encoding = [to_encoding]
266 266
267 267 for enc in to_encoding:
268 268 try:
269 269 return unicode_.encode(enc)
270 270 except UnicodeEncodeError:
271 271 pass
272 272
273 273 try:
274 274 import chardet
275 275 encoding = chardet.detect(unicode_)['encoding']
276 276 if encoding is None:
277 277 raise UnicodeEncodeError()
278 278
279 279 return unicode_.encode(encoding)
280 280 except (ImportError, UnicodeEncodeError):
281 281 return unicode_.encode(to_encoding[0], 'replace')
282 282
283 283 return safe_str
284 284
285 285
286 286 def remove_suffix(s, suffix):
287 287 if s.endswith(suffix):
288 288 s = s[:-1 * len(suffix)]
289 289 return s
290 290
291 291
292 292 def remove_prefix(s, prefix):
293 293 if s.startswith(prefix):
294 294 s = s[len(prefix):]
295 295 return s
296 296
297 297
298 298 def engine_from_config(configuration, prefix='sqlalchemy.', **kwargs):
299 299 """
300 300 Custom engine_from_config functions that makes sure we use NullPool for
301 301 file based sqlite databases. This prevents errors on sqlite. This only
302 302 applies to sqlalchemy versions < 0.7.0
303 303
304 304 """
305 305 import sqlalchemy
306 306 from sqlalchemy import engine_from_config as efc
307 307 import logging
308 308
309 309 if int(sqlalchemy.__version__.split('.')[1]) < 7:
310 310
311 311 # This solution should work for sqlalchemy < 0.7.0, and should use
312 312 # proxy=TimerProxy() for execution time profiling
313 313
314 314 from sqlalchemy.pool import NullPool
315 315 url = configuration[prefix + 'url']
316 316
317 317 if url.startswith('sqlite'):
318 318 kwargs.update({'poolclass': NullPool})
319 319 return efc(configuration, prefix, **kwargs)
320 320 else:
321 321 import time
322 322 from sqlalchemy import event
323 323 from sqlalchemy.engine import Engine
324 324
325 325 log = logging.getLogger('sqlalchemy.engine')
326 326 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38)
327 327 engine = efc(configuration, prefix, **kwargs)
328 328
329 329 def color_sql(sql):
330 330 COLOR_SEQ = "\033[1;%dm"
331 331 COLOR_SQL = YELLOW
332 332 normal = '\x1b[0m'
333 333 return ''.join([COLOR_SEQ % COLOR_SQL, sql, normal])
334 334
335 335 if configuration['debug']:
336 336 #attach events only for debug configuration
337 337
338 338 def before_cursor_execute(conn, cursor, statement,
339 339 parameters, context, executemany):
340 340 context._query_start_time = time.time()
341 341 log.info(color_sql(">>>>> STARTING QUERY >>>>>"))
342 342
343 343 def after_cursor_execute(conn, cursor, statement,
344 344 parameters, context, executemany):
345 345 total = time.time() - context._query_start_time
346 346 log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total))
347 347
348 348 event.listen(engine, "before_cursor_execute",
349 349 before_cursor_execute)
350 350 event.listen(engine, "after_cursor_execute",
351 351 after_cursor_execute)
352 352
353 353 return engine
354 354
355 355
356 def age(prevdate, show_short_version=False):
356 def age(prevdate, show_short_version=False, now=None):
357 357 """
358 358 turns a datetime into an age string.
359 359 If show_short_version is True, then it will generate a not so accurate but shorter string,
360 360 example: 2days ago, instead of 2 days and 23 hours ago.
361 361
362 362 :param prevdate: datetime object
363 363 :param show_short_version: if it should aproximate the date and return a shorter string
364 364 :rtype: unicode
365 365 :returns: unicode words describing age
366 366 """
367 now = datetime.datetime.now()
368 now = now.replace(microsecond=0)
367 now = now or datetime.datetime.now()
369 368 order = ['year', 'month', 'day', 'hour', 'minute', 'second']
370 369 deltas = {}
371 370 future = False
372 371
373 372 if prevdate > now:
374 373 now, prevdate = prevdate, now
375 374 future = True
376
375 if future:
376 prevdate = prevdate.replace(microsecond=0)
377 377 # Get date parts deltas
378 from dateutil import relativedelta
378 379 for part in order:
379 if future:
380 from dateutil import relativedelta
381 d = relativedelta.relativedelta(now, prevdate)
382 deltas[part] = getattr(d, part + 's')
383 else:
384 deltas[part] = getattr(now, part) - getattr(prevdate, part)
380 d = relativedelta.relativedelta(now, prevdate)
381 deltas[part] = getattr(d, part + 's')
385 382
386 383 # Fix negative offsets (there is 1 second between 10:59:59 and 11:00:00,
387 384 # not 1 hour, -59 minutes and -59 seconds)
388 385 for num, length in [(5, 60), (4, 60), (3, 24)]: # seconds, minutes, hours
389 386 part = order[num]
390 387 carry_part = order[num - 1]
391 388
392 389 if deltas[part] < 0:
393 390 deltas[part] += length
394 391 deltas[carry_part] -= 1
395 392
396 393 # Same thing for days except that the increment depends on the (variable)
397 394 # number of days in the month
398 395 month_lengths = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
399 396 if deltas['day'] < 0:
400 397 if prevdate.month == 2 and (prevdate.year % 4 == 0 and
401 398 (prevdate.year % 100 != 0 or prevdate.year % 400 == 0)):
402 399 deltas['day'] += 29
403 400 else:
404 401 deltas['day'] += month_lengths[prevdate.month - 1]
405 402
406 403 deltas['month'] -= 1
407 404
408 405 if deltas['month'] < 0:
409 406 deltas['month'] += 12
410 407 deltas['year'] -= 1
411 408
412 409 # Format the result
413 410 fmt_funcs = {
414 411 'year': lambda d: ungettext(u'%d year', '%d years', d) % d,
415 412 'month': lambda d: ungettext(u'%d month', '%d months', d) % d,
416 413 'day': lambda d: ungettext(u'%d day', '%d days', d) % d,
417 414 'hour': lambda d: ungettext(u'%d hour', '%d hours', d) % d,
418 415 'minute': lambda d: ungettext(u'%d minute', '%d minutes', d) % d,
419 416 'second': lambda d: ungettext(u'%d second', '%d seconds', d) % d,
420 417 }
421 418
422 419 for i, part in enumerate(order):
423 420 value = deltas[part]
424 421 if value == 0:
425 422 continue
426 423
427 424 if i < 5:
428 425 sub_part = order[i + 1]
429 426 sub_value = deltas[sub_part]
430 427 else:
431 428 sub_value = 0
432 429
433 430 if sub_value == 0 or show_short_version:
434 431 if future:
435 432 return _(u'in %s') % fmt_funcs[part](value)
436 433 else:
437 434 return _(u'%s ago') % fmt_funcs[part](value)
438 435 if future:
439 436 return _(u'in %s and %s') % (fmt_funcs[part](value),
440 437 fmt_funcs[sub_part](sub_value))
441 438 else:
442 439 return _(u'%s and %s ago') % (fmt_funcs[part](value),
443 440 fmt_funcs[sub_part](sub_value))
444 441
445 442 return _(u'just now')
446 443
447 444
448 445 def uri_filter(uri):
449 446 """
450 447 Removes user:password from given url string
451 448
452 449 :param uri:
453 450 :rtype: unicode
454 451 :returns: filtered list of strings
455 452 """
456 453 if not uri:
457 454 return ''
458 455
459 456 proto = ''
460 457
461 458 for pat in ('https://', 'http://'):
462 459 if uri.startswith(pat):
463 460 uri = uri[len(pat):]
464 461 proto = pat
465 462 break
466 463
467 464 # remove passwords and username
468 465 uri = uri[uri.find('@') + 1:]
469 466
470 467 # get the port
471 468 cred_pos = uri.find(':')
472 469 if cred_pos == -1:
473 470 host, port = uri, None
474 471 else:
475 472 host, port = uri[:cred_pos], uri[cred_pos + 1:]
476 473
477 474 return filter(None, [proto, host, port])
478 475
479 476
480 477 def credentials_filter(uri):
481 478 """
482 479 Returns a url with removed credentials
483 480
484 481 :param uri:
485 482 """
486 483
487 484 uri = uri_filter(uri)
488 485 #check if we have port
489 486 if len(uri) > 2 and uri[2]:
490 487 uri[2] = ':' + uri[2]
491 488
492 489 return ''.join(uri)
493 490
494 491
495 492 def get_changeset_safe(repo, rev):
496 493 """
497 494 Safe version of get_changeset if this changeset doesn't exists for a
498 495 repo it returns a Dummy one instead
499 496
500 497 :param repo:
501 498 :param rev:
502 499 """
503 500 from rhodecode.lib.vcs.backends.base import BaseRepository
504 501 from rhodecode.lib.vcs.exceptions import RepositoryError
505 502 from rhodecode.lib.vcs.backends.base import EmptyChangeset
506 503 if not isinstance(repo, BaseRepository):
507 504 raise Exception('You must pass an Repository '
508 505 'object as first argument got %s', type(repo))
509 506
510 507 try:
511 508 cs = repo.get_changeset(rev)
512 509 except RepositoryError:
513 510 cs = EmptyChangeset(requested_revision=rev)
514 511 return cs
515 512
516 513
517 514 def datetime_to_time(dt):
518 515 if dt:
519 516 return time.mktime(dt.timetuple())
520 517
521 518
522 519 def time_to_datetime(tm):
523 520 if tm:
524 521 if isinstance(tm, basestring):
525 522 try:
526 523 tm = float(tm)
527 524 except ValueError:
528 525 return
529 526 return datetime.datetime.fromtimestamp(tm)
530 527
531 528 MENTIONS_REGEX = r'(?:^@|\s@)([a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+)(?:\s{1})'
532 529
533 530
534 531 def extract_mentioned_users(s):
535 532 """
536 533 Returns unique usernames from given string s that have @mention
537 534
538 535 :param s: string to get mentions
539 536 """
540 537 usrs = set()
541 538 for username in re.findall(MENTIONS_REGEX, s):
542 539 usrs.add(username)
543 540
544 541 return sorted(list(usrs), key=lambda k: k.lower())
545 542
546 543
547 544 class AttributeDict(dict):
548 545 def __getattr__(self, attr):
549 546 return self.get(attr, None)
550 547 __setattr__ = dict.__setitem__
551 548 __delattr__ = dict.__delitem__
552 549
553 550
554 551 def fix_PATH(os_=None):
555 552 """
556 553 Get current active python path, and append it to PATH variable to fix issues
557 554 of subprocess calls and different python versions
558 555 """
559 556 if os_ is None:
560 557 import os
561 558 else:
562 559 os = os_
563 560
564 561 cur_path = os.path.split(sys.executable)[0]
565 562 if not os.environ['PATH'].startswith(cur_path):
566 563 os.environ['PATH'] = '%s:%s' % (cur_path, os.environ['PATH'])
567 564
568 565
569 566 def obfuscate_url_pw(engine):
570 567 _url = engine or ''
571 568 from sqlalchemy.engine import url as sa_url
572 569 try:
573 570 _url = sa_url.make_url(engine)
574 571 if _url.password:
575 572 _url.password = 'XXXXX'
576 573 except Exception:
577 574 pass
578 575 return str(_url)
579 576
580 577
581 578 def get_server_url(environ):
582 579 req = webob.Request(environ)
583 580 return req.host_url + req.script_name
584 581
585 582
586 583 def _extract_extras(env=None):
587 584 """
588 585 Extracts the rc extras data from os.environ, and wraps it into named
589 586 AttributeDict object
590 587 """
591 588 if not env:
592 589 env = os.environ
593 590
594 591 try:
595 592 rc_extras = json.loads(env['RC_SCM_DATA'])
596 593 except Exception:
597 594 print os.environ
598 595 print >> sys.stderr, traceback.format_exc()
599 596 rc_extras = {}
600 597
601 598 try:
602 599 for k in ['username', 'repository', 'locked_by', 'scm', 'make_lock',
603 600 'action', 'ip']:
604 601 rc_extras[k]
605 602 except KeyError, e:
606 603 raise Exception('Missing key %s in os.environ %s' % (e, rc_extras))
607 604
608 605 return AttributeDict(rc_extras)
609 606
610 607
611 608 def _set_extras(extras):
612 609 os.environ['RC_SCM_DATA'] = json.dumps(extras)
@@ -1,290 +1,295 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.tests.test_libs
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6
7 7 Package for testing various lib/helper functions in rhodecode
8 8
9 9 :created_on: Jun 9, 2011
10 10 :copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25 from __future__ import with_statement
26 26 import unittest
27 27 import datetime
28 28 import hashlib
29 29 import mock
30 30 from rhodecode.tests import *
31 31
32 32 proto = 'http'
33 33 TEST_URLS = [
34 34 ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
35 35 '%s://127.0.0.1' % proto),
36 36 ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
37 37 '%s://127.0.0.1' % proto),
38 38 ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
39 39 '%s://127.0.0.1' % proto),
40 40 ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
41 41 '%s://127.0.0.1:8080' % proto),
42 42 ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
43 43 '%s://domain.org' % proto),
44 44 ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
45 45 '8080'],
46 46 '%s://domain.org:8080' % proto),
47 47 ]
48 48
49 49 proto = 'https'
50 50 TEST_URLS += [
51 51 ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
52 52 '%s://127.0.0.1' % proto),
53 53 ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
54 54 '%s://127.0.0.1' % proto),
55 55 ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
56 56 '%s://127.0.0.1' % proto),
57 57 ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
58 58 '%s://127.0.0.1:8080' % proto),
59 59 ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
60 60 '%s://domain.org' % proto),
61 61 ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
62 62 '8080'],
63 63 '%s://domain.org:8080' % proto),
64 64 ]
65 65
66 66
67 67 class TestLibs(unittest.TestCase):
68 68
69 69 @parameterized.expand(TEST_URLS)
70 70 def test_uri_filter(self, test_url, expected, expected_creds):
71 71 from rhodecode.lib.utils2 import uri_filter
72 72 self.assertEqual(uri_filter(test_url), expected)
73 73
74 74 @parameterized.expand(TEST_URLS)
75 75 def test_credentials_filter(self, test_url, expected, expected_creds):
76 76 from rhodecode.lib.utils2 import credentials_filter
77 77 self.assertEqual(credentials_filter(test_url), expected_creds)
78 78
79 79 @parameterized.expand([('t', True),
80 80 ('true', True),
81 81 ('y', True),
82 82 ('yes', True),
83 83 ('on', True),
84 84 ('1', True),
85 85 ('Y', True),
86 86 ('yeS', True),
87 87 ('Y', True),
88 88 ('TRUE', True),
89 89 ('T', True),
90 90 ('False', False),
91 91 ('F', False),
92 92 ('FALSE', False),
93 93 ('0', False),
94 94 ('-1', False),
95 95 ('', False)
96 96 ])
97 97 def test_str2bool(self, str_bool, expected):
98 98 from rhodecode.lib.utils2 import str2bool
99 99 self.assertEqual(str2bool(str_bool), expected)
100 100
101 101 def test_mention_extractor(self):
102 102 from rhodecode.lib.utils2 import extract_mentioned_users
103 103 sample = (
104 104 "@first hi there @marcink here's my email marcin@email.com "
105 105 "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three "
106 106 "@MARCIN @maRCiN @2one_more22 @john please see this http://org.pl "
107 107 "@marian.user just do it @marco-polo and next extract @marco_polo "
108 108 "user.dot hej ! not-needed maril@domain.org"
109 109 )
110 110
111 111 s = sorted([
112 112 'first', 'marcink', 'lukaszb', 'one_more22', 'MARCIN', 'maRCiN', 'john',
113 113 'marian.user', 'marco-polo', 'marco_polo'
114 114 ], key=lambda k: k.lower())
115 115 self.assertEqual(s, extract_mentioned_users(sample))
116 116
117 def test_age(self):
117 @parameterized.expand([
118 (dict(), u'just now'),
119 (dict(seconds= -1), u'1 second ago'),
120 (dict(seconds= -60 * 2), u'2 minutes ago'),
121 (dict(hours= -1), u'1 hour ago'),
122 (dict(hours= -24), u'1 day ago'),
123 (dict(hours= -24 * 5), u'5 days ago'),
124 (dict(months= -1), u'1 month ago'),
125 (dict(months= -1, days= -2), u'1 month and 2 days ago'),
126 (dict(years= -1, months= -1), u'1 year and 1 month ago'),
127 ])
128 def test_age(self, age_args, expected):
118 129 from rhodecode.lib.utils2 import age
119 130 from dateutil import relativedelta
120 n = datetime.datetime.now()
131 n = datetime.datetime(year=2012, month=5, day=17)
121 132 delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
133 self.assertEqual(age(n + delt(**age_args), now=n), expected)
134
135 @parameterized.expand([
122 136
123 self.assertEqual(age(n), u'just now')
124 self.assertEqual(age(n + delt(seconds=-1)), u'1 second ago')
125 self.assertEqual(age(n + delt(seconds=-60 * 2)), u'2 minutes ago')
126 self.assertEqual(age(n + delt(hours=-1)), u'1 hour ago')
127 self.assertEqual(age(n + delt(hours=-24)), u'1 day ago')
128 self.assertEqual(age(n + delt(hours=-24 * 5)), u'5 days ago')
129 self.assertEqual(age(n + delt(months=-1)), u'1 month ago')
130 self.assertEqual(age(n + delt(months=-1, days=-2)), u'1 month and 2 days ago')
131 self.assertEqual(age(n + delt(years=-1, months=-1)), u'1 year and 1 month ago')
132
133 def test_age_in_future(self):
137 (dict(), u'just now'),
138 (dict(seconds=1), u'in 1 second'),
139 (dict(seconds=60 * 2), u'in 2 minutes'),
140 (dict(hours=1), u'in 1 hour'),
141 (dict(hours=24), u'in 1 day'),
142 (dict(hours=24 * 5), u'in 5 days'),
143 (dict(months=1), u'in 1 month'),
144 (dict(months=1, days=1), u'in 1 month and 1 day'),
145 (dict(years=1, months=1), u'in 1 year and 1 month')
146 ])
147 def test_age_in_future(self, age_args, expected):
134 148 from rhodecode.lib.utils2 import age
135 149 from dateutil import relativedelta
136 n = datetime.datetime.now()
150 n = datetime.datetime(year=2012, month=5, day=17)
137 151 delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
138
139 self.assertEqual(age(n), u'just now')
140 self.assertEqual(age(n + delt(seconds=1)), u'in 1 second')
141 self.assertEqual(age(n + delt(seconds=60 * 2)), u'in 2 minutes')
142 self.assertEqual(age(n + delt(hours=1)), u'in 1 hour')
143 self.assertEqual(age(n + delt(hours=24)), u'in 1 day')
144 self.assertEqual(age(n + delt(hours=24 * 5)), u'in 5 days')
145 self.assertEqual(age(n + delt(months=1)), u'in 1 month')
146 self.assertEqual(age(n + delt(months=1, days=1)), u'in 1 month and 1 day')
147 self.assertEqual(age(n + delt(years=1, months=1)), u'in 1 year and 1 month')
152 self.assertEqual(age(n + delt(**age_args), now=n), expected)
148 153
149 154 def test_tag_exctrator(self):
150 155 sample = (
151 156 "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
152 157 "[requires] [stale] [see<>=>] [see => http://url.com]"
153 158 "[requires => url] [lang => python] [just a tag]"
154 159 "[,d] [ => ULR ] [obsolete] [desc]]"
155 160 )
156 161 from rhodecode.lib.helpers import desc_stylize
157 162 res = desc_stylize(sample)
158 163 self.assertTrue('<div class="metatag" tag="tag">tag</div>' in res)
159 164 self.assertTrue('<div class="metatag" tag="obsolete">obsolete</div>' in res)
160 165 self.assertTrue('<div class="metatag" tag="stale">stale</div>' in res)
161 166 self.assertTrue('<div class="metatag" tag="lang">python</div>' in res)
162 167 self.assertTrue('<div class="metatag" tag="requires">requires =&gt; <a href="/url">url</a></div>' in res)
163 168 self.assertTrue('<div class="metatag" tag="tag">tag</div>' in res)
164 169
165 170 def test_alternative_gravatar(self):
166 171 from rhodecode.lib.helpers import gravatar_url
167 172 _md5 = lambda s: hashlib.md5(s).hexdigest()
168 173
169 174 def fake_conf(**kwargs):
170 175 from pylons import config
171 176 config['app_conf'] = {}
172 177 config['app_conf']['use_gravatar'] = True
173 178 config['app_conf'].update(kwargs)
174 179 return config
175 180
176 181 class fake_url():
177 182 @classmethod
178 183 def current(cls, *args, **kwargs):
179 184 return 'https://server.com'
180 185
181 186 with mock.patch('pylons.url', fake_url):
182 187 fake = fake_conf(alternative_gravatar_url='http://test.com/{email}')
183 188 with mock.patch('pylons.config', fake):
184 189 from pylons import url
185 190 assert url.current() == 'https://server.com'
186 191 grav = gravatar_url(email_address='test@foo.com', size=24)
187 192 assert grav == 'http://test.com/test@foo.com'
188 193
189 194 fake = fake_conf(alternative_gravatar_url='http://test.com/{email}')
190 195 with mock.patch('pylons.config', fake):
191 196 grav = gravatar_url(email_address='test@foo.com', size=24)
192 197 assert grav == 'http://test.com/test@foo.com'
193 198
194 199 fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}')
195 200 with mock.patch('pylons.config', fake):
196 201 em = 'test@foo.com'
197 202 grav = gravatar_url(email_address=em, size=24)
198 203 assert grav == 'http://test.com/%s' % (_md5(em))
199 204
200 205 fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}/{size}')
201 206 with mock.patch('pylons.config', fake):
202 207 em = 'test@foo.com'
203 208 grav = gravatar_url(email_address=em, size=24)
204 209 assert grav == 'http://test.com/%s/%s' % (_md5(em), 24)
205 210
206 211 fake = fake_conf(alternative_gravatar_url='{scheme}://{netloc}/{md5email}/{size}')
207 212 with mock.patch('pylons.config', fake):
208 213 em = 'test@foo.com'
209 214 grav = gravatar_url(email_address=em, size=24)
210 215 assert grav == 'https://server.com/%s/%s' % (_md5(em), 24)
211 216
212 217 def _quick_url(self, text, tmpl="""<a class="revision-link" href="%s">%s</a>""", url_=None):
213 218 """
214 219 Changes `some text url[foo]` => `some text <a href="/">foo</a>
215 220
216 221 :param text:
217 222 """
218 223 import re
219 #quickly change expected url[] into a link
224 # quickly change expected url[] into a link
220 225 URL_PAT = re.compile(r'(?:url\[)(.+?)(?:\])')
221 226
222 227 def url_func(match_obj):
223 228 _url = match_obj.groups()[0]
224 229 return tmpl % (url_ or '/some-url', _url)
225 230 return URL_PAT.sub(url_func, text)
226 231
227 232 @parameterized.expand([
228 233 ("",
229 234 ""),
230 235 ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68",
231 236 "git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68"),
232 237 ("from rev 000000000000",
233 238 "from rev url[000000000000]"),
234 239 ("from rev 000000000000123123 also rev 000000000000",
235 240 "from rev url[000000000000123123] also rev url[000000000000]"),
236 241 ("this should-000 00",
237 242 "this should-000 00"),
238 243 ("longtextffffffffff rev 123123123123",
239 244 "longtextffffffffff rev url[123123123123]"),
240 245 ("rev ffffffffffffffffffffffffffffffffffffffffffffffffff",
241 246 "rev ffffffffffffffffffffffffffffffffffffffffffffffffff"),
242 247 ("ffffffffffff some text traalaa",
243 248 "url[ffffffffffff] some text traalaa"),
244 249 ("""Multi line
245 250 123123123123
246 251 some text 123123123123
247 252 sometimes !
248 253 """,
249 254 """Multi line
250 255 url[123123123123]
251 256 some text url[123123123123]
252 257 sometimes !
253 258 """)
254 259 ])
255 260 def test_urlify_changesets(self, sample, expected):
256 261 def fake_url(self, *args, **kwargs):
257 262 return '/some-url'
258 263
259 264 expected = self._quick_url(expected)
260 265
261 266 with mock.patch('pylons.url', fake_url):
262 267 from rhodecode.lib.helpers import urlify_changesets
263 268 self.assertEqual(urlify_changesets(sample, 'repo_name'), expected)
264 269
265 270 @parameterized.expand([
266 271 ("",
267 272 "",
268 273 ""),
269 274 ("https://svn.apache.org/repos",
270 275 "url[https://svn.apache.org/repos]",
271 276 "https://svn.apache.org/repos"),
272 277 ("http://svn.apache.org/repos",
273 278 "url[http://svn.apache.org/repos]",
274 279 "http://svn.apache.org/repos"),
275 280 ("from rev a also rev http://google.com",
276 281 "from rev a also rev url[http://google.com]",
277 282 "http://google.com"),
278 283 ("""Multi line
279 284 https://foo.bar.com
280 285 some text lalala""",
281 286 """Multi line
282 287 url[https://foo.bar.com]
283 288 some text lalala""",
284 289 "https://foo.bar.com")
285 290 ])
286 291 def test_urlify_test(self, sample, expected, url_):
287 292 from rhodecode.lib.helpers import urlify_text
288 293 expected = self._quick_url(expected,
289 294 tmpl="""<a href="%s">%s</a>""", url_=url_)
290 295 self.assertEqual(urlify_text(sample), expected)
General Comments 0
You need to be logged in to leave comments. Login now