##// END OF EJS Templates
fix unreachable code errors
marcink -
r3885:712610e0 beta
parent child Browse files
Show More
@@ -1,171 +1,170 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.bin.gist
4 4 ~~~~~~~~~~~~~~~~~~
5 5
6 6 Gist CLI client for RhodeCode
7 7
8 8 :created_on: May 9, 2013
9 9 :author: marcink
10 10 :copyright: (C) 2010-2013 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25
26 26 from __future__ import with_statement
27 27 import os
28 28 import sys
29 29 import stat
30 30 import argparse
31 31 import fileinput
32 32
33 33 from rhodecode.bin.base import json, api_call, RcConf, FORMAT_JSON, FORMAT_PRETTY
34 34
35 35
36 36 def argparser(argv):
37 37 usage = (
38 38 "rhodecode-gist [-h] [--format=FORMAT] [--apikey=APIKEY] [--apihost=APIHOST] "
39 39 "[--config=CONFIG] [--save-config] [GIST OPTIONS] "
40 40 "[filename or stdin use - for terminal stdin ]\n"
41 41 "Create config file: rhodecode-gist --apikey=<key> --apihost=http://rhodecode.server --save-config"
42 42 )
43 43
44 44 parser = argparse.ArgumentParser(description='RhodeCode Gist cli',
45 45 usage=usage)
46 46
47 47 ## config
48 48 group = parser.add_argument_group('config')
49 49 group.add_argument('--apikey', help='api access key')
50 50 group.add_argument('--apihost', help='api host')
51 51 group.add_argument('--config', help='config file path DEFAULT: ~/.rhodecode')
52 52 group.add_argument('--save-config', action='store_true',
53 53 help='save the given config into a file')
54 54
55 55 group = parser.add_argument_group('GIST')
56 56 group.add_argument('-p', '--private', action='store_true',
57 57 help='create private Gist')
58 58 group.add_argument('-f', '--filename',
59 59 help='set uploaded gist filename, '
60 60 'also defines syntax highlighting')
61 61 group.add_argument('-d', '--description', help='Gist description')
62 62 group.add_argument('-l', '--lifetime', metavar='MINUTES',
63 63 help='gist lifetime in minutes, -1 (DEFAULT) is forever')
64 64 group.add_argument('--format', dest='format', type=str,
65 65 help='output format DEFAULT: `%s` can '
66 66 'be also `%s`' % (FORMAT_PRETTY, FORMAT_JSON),
67 67 default=FORMAT_PRETTY
68 68 )
69 69 args, other = parser.parse_known_args()
70 70 return parser, args, other
71 71
72 72
73 73 def _run(argv):
74 74 conf = None
75 75 parser, args, other = argparser(argv)
76 76
77 77 api_credentials_given = (args.apikey and args.apihost)
78 78 if args.save_config:
79 79 if not api_credentials_given:
80 80 raise parser.error('--save-config requires --apikey and --apihost')
81 81 conf = RcConf(config_location=args.config,
82 82 autocreate=True, config={'apikey': args.apikey,
83 83 'apihost': args.apihost})
84 84 sys.exit()
85 85
86 86 if not conf:
87 87 conf = RcConf(config_location=args.config, autoload=True)
88 88 if not conf:
89 89 if not api_credentials_given:
90 90 parser.error('Could not find config file and missing '
91 91 '--apikey or --apihost in params')
92 92
93 93 apikey = args.apikey or conf['apikey']
94 94 host = args.apihost or conf['apihost']
95 95 DEFAULT_FILENAME = 'gistfile1.txt'
96 96 if other:
97 97 # skip multifiles for now
98 98 filename = other[0]
99 99 if filename == '-':
100 100 filename = DEFAULT_FILENAME
101 101 gist_content = ''
102 102 for line in fileinput.input('-'):
103 103 gist_content += line
104 104 else:
105 105 with open(filename, 'rb') as f:
106 106 gist_content = f.read()
107 107
108 108 else:
109 109 filename = DEFAULT_FILENAME
110 110 gist_content = None
111 111 # little bit hacky but cross platform check where the
112 112 # stdin comes from we skip the terminal case it can be handled by '-'
113 113 mode = os.fstat(0).st_mode
114 114 if stat.S_ISFIFO(mode):
115 115 # "stdin is piped"
116 116 gist_content = sys.stdin.read()
117 117 elif stat.S_ISREG(mode):
118 118 # "stdin is redirected"
119 119 gist_content = sys.stdin.read()
120 120 else:
121 121 # "stdin is terminal"
122 122 pass
123 123
124 124 # make sure we don't upload binary stuff
125 125 if gist_content and '\0' in gist_content:
126 126 raise Exception('Error: binary files upload is not possible')
127 127
128 128 filename = os.path.basename(args.filename or filename)
129 129 if gist_content:
130 130 files = {
131 131 filename: {
132 132 'content': gist_content,
133 133 'lexer': None
134 134 }
135 135 }
136 136
137 137 margs = dict(
138 138 gist_lifetime=args.lifetime,
139 139 gist_description=args.description,
140 140 gist_type='private' if args.private else 'public',
141 141 files=files
142 142 )
143 143
144 144 json_data = api_call(apikey, host, 'create_gist', **margs)['result']
145 145 if args.format == FORMAT_JSON:
146 146 print json.dumps(json_data)
147 147 elif args.format == FORMAT_PRETTY:
148 148 print 'Created %s gist %s' % (json_data['gist_type'],
149 149 json_data['gist_url'])
150 150 return 0
151 151
152 152
153 153 def main(argv=None):
154 154 """
155 155 Main execution function for cli
156 156
157 157 :param argv:
158 158 """
159 159 if argv is None:
160 160 argv = sys.argv
161 161
162 162 try:
163 163 return _run(argv)
164 164 except Exception, e:
165 raise
166 165 print e
167 166 return 1
168 167
169 168
170 169 if __name__ == '__main__':
171 170 sys.exit(main(sys.argv))
@@ -1,646 +1,644 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.utils
4 4 ~~~~~~~~~~~~~~~~~~~
5 5
6 6 Some simple helper functions
7 7
8 8 :created_on: Jan 5, 2011
9 9 :author: marcink
10 10 :copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25
26 26 import os
27 27 import re
28 28 import sys
29 29 import time
30 30 import uuid
31 31 import datetime
32 32 import traceback
33 33 import webob
34 34
35 35 from pylons.i18n.translation import _, ungettext
36 36 from rhodecode.lib.vcs.utils.lazy import LazyProperty
37 37 from rhodecode.lib.compat import json
38 38
39 39
40 40 def __get_lem():
41 41 """
42 42 Get language extension map based on what's inside pygments lexers
43 43 """
44 44 from pygments import lexers
45 45 from string import lower
46 46 from collections import defaultdict
47 47
48 48 d = defaultdict(lambda: [])
49 49
50 50 def __clean(s):
51 51 s = s.lstrip('*')
52 52 s = s.lstrip('.')
53 53
54 54 if s.find('[') != -1:
55 55 exts = []
56 56 start, stop = s.find('['), s.find(']')
57 57
58 58 for suffix in s[start + 1:stop]:
59 59 exts.append(s[:s.find('[')] + suffix)
60 60 return map(lower, exts)
61 61 else:
62 62 return map(lower, [s])
63 63
64 64 for lx, t in sorted(lexers.LEXERS.items()):
65 65 m = map(__clean, t[-2])
66 66 if m:
67 67 m = reduce(lambda x, y: x + y, m)
68 68 for ext in m:
69 69 desc = lx.replace('Lexer', '')
70 70 d[ext].append(desc)
71 71
72 72 return dict(d)
73 73
74 74
75 75 def str2bool(_str):
76 76 """
77 77 returs True/False value from given string, it tries to translate the
78 78 string into boolean
79 79
80 80 :param _str: string value to translate into boolean
81 81 :rtype: boolean
82 82 :returns: boolean from given string
83 83 """
84 84 if _str is None:
85 85 return False
86 86 if _str in (True, False):
87 87 return _str
88 88 _str = str(_str).strip().lower()
89 89 return _str in ('t', 'true', 'y', 'yes', 'on', '1')
90 90
91 91
92 92 def aslist(obj, sep=None, strip=True):
93 93 """
94 94 Returns given string separated by sep as list
95 95
96 96 :param obj:
97 97 :param sep:
98 98 :param strip:
99 99 """
100 100 if isinstance(obj, (basestring)):
101 101 lst = obj.split(sep)
102 102 if strip:
103 103 lst = [v.strip() for v in lst]
104 104 return lst
105 105 elif isinstance(obj, (list, tuple)):
106 106 return obj
107 107 elif obj is None:
108 108 return []
109 109 else:
110 110 return [obj]
111 111
112 112
113 113 def convert_line_endings(line, mode):
114 114 """
115 115 Converts a given line "line end" accordingly to given mode
116 116
117 117 Available modes are::
118 118 0 - Unix
119 119 1 - Mac
120 120 2 - DOS
121 121
122 122 :param line: given line to convert
123 123 :param mode: mode to convert to
124 124 :rtype: str
125 125 :return: converted line according to mode
126 126 """
127 127 from string import replace
128 128
129 129 if mode == 0:
130 130 line = replace(line, '\r\n', '\n')
131 131 line = replace(line, '\r', '\n')
132 132 elif mode == 1:
133 133 line = replace(line, '\r\n', '\r')
134 134 line = replace(line, '\n', '\r')
135 135 elif mode == 2:
136 136 line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line)
137 137 return line
138 138
139 139
140 140 def detect_mode(line, default):
141 141 """
142 142 Detects line break for given line, if line break couldn't be found
143 143 given default value is returned
144 144
145 145 :param line: str line
146 146 :param default: default
147 147 :rtype: int
148 148 :return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS
149 149 """
150 150 if line.endswith('\r\n'):
151 151 return 2
152 152 elif line.endswith('\n'):
153 153 return 0
154 154 elif line.endswith('\r'):
155 155 return 1
156 156 else:
157 157 return default
158 158
159 159
160 160 def generate_api_key(username, salt=None):
161 161 """
162 162 Generates unique API key for given username, if salt is not given
163 163 it'll be generated from some random string
164 164
165 165 :param username: username as string
166 166 :param salt: salt to hash generate KEY
167 167 :rtype: str
168 168 :returns: sha1 hash from username+salt
169 169 """
170 170 from tempfile import _RandomNameSequence
171 171 import hashlib
172 172
173 173 if salt is None:
174 174 salt = _RandomNameSequence().next()
175 175
176 176 return hashlib.sha1(username + salt).hexdigest()
177 177
178 178
179 179 def safe_int(val, default=None):
180 180 """
181 181 Returns int() of val if val is not convertable to int use default
182 182 instead
183 183
184 184 :param val:
185 185 :param default:
186 186 """
187 187
188 188 try:
189 189 val = int(val)
190 190 except (ValueError, TypeError):
191 191 val = default
192 192
193 193 return val
194 194
195 195
196 196 def safe_unicode(str_, from_encoding=None):
197 197 """
198 198 safe unicode function. Does few trick to turn str_ into unicode
199 199
200 200 In case of UnicodeDecode error we try to return it with encoding detected
201 201 by chardet library if it fails fallback to unicode with errors replaced
202 202
203 203 :param str_: string to decode
204 204 :rtype: unicode
205 205 :returns: unicode object
206 206 """
207 207 if isinstance(str_, unicode):
208 208 return str_
209 209
210 210 if not from_encoding:
211 211 import rhodecode
212 212 DEFAULT_ENCODINGS = aslist(rhodecode.CONFIG.get('default_encoding',
213 213 'utf8'), sep=',')
214 214 from_encoding = DEFAULT_ENCODINGS
215 215
216 216 if not isinstance(from_encoding, (list, tuple)):
217 217 from_encoding = [from_encoding]
218 218
219 219 try:
220 220 return unicode(str_)
221 221 except UnicodeDecodeError:
222 222 pass
223 223
224 224 for enc in from_encoding:
225 225 try:
226 226 return unicode(str_, enc)
227 227 except UnicodeDecodeError:
228 228 pass
229 229
230 230 try:
231 231 import chardet
232 232 encoding = chardet.detect(str_)['encoding']
233 233 if encoding is None:
234 234 raise Exception()
235 235 return str_.decode(encoding)
236 236 except (ImportError, UnicodeDecodeError, Exception):
237 237 return unicode(str_, from_encoding[0], 'replace')
238 238
239 239
240 240 def safe_str(unicode_, to_encoding=None):
241 241 """
242 242 safe str function. Does few trick to turn unicode_ into string
243 243
244 244 In case of UnicodeEncodeError we try to return it with encoding detected
245 245 by chardet library if it fails fallback to string with errors replaced
246 246
247 247 :param unicode_: unicode to encode
248 248 :rtype: str
249 249 :returns: str object
250 250 """
251 251
252 252 # if it's not basestr cast to str
253 253 if not isinstance(unicode_, basestring):
254 254 return str(unicode_)
255 255
256 256 if isinstance(unicode_, str):
257 257 return unicode_
258 258
259 259 if not to_encoding:
260 260 import rhodecode
261 261 DEFAULT_ENCODINGS = aslist(rhodecode.CONFIG.get('default_encoding',
262 262 'utf8'), sep=',')
263 263 to_encoding = DEFAULT_ENCODINGS
264 264
265 265 if not isinstance(to_encoding, (list, tuple)):
266 266 to_encoding = [to_encoding]
267 267
268 268 for enc in to_encoding:
269 269 try:
270 270 return unicode_.encode(enc)
271 271 except UnicodeEncodeError:
272 272 pass
273 273
274 274 try:
275 275 import chardet
276 276 encoding = chardet.detect(unicode_)['encoding']
277 277 if encoding is None:
278 278 raise UnicodeEncodeError()
279 279
280 280 return unicode_.encode(encoding)
281 281 except (ImportError, UnicodeEncodeError):
282 282 return unicode_.encode(to_encoding[0], 'replace')
283 283
284 return safe_str
285
286 284
287 285 def remove_suffix(s, suffix):
288 286 if s.endswith(suffix):
289 287 s = s[:-1 * len(suffix)]
290 288 return s
291 289
292 290
293 291 def remove_prefix(s, prefix):
294 292 if s.startswith(prefix):
295 293 s = s[len(prefix):]
296 294 return s
297 295
298 296
299 297 def engine_from_config(configuration, prefix='sqlalchemy.', **kwargs):
300 298 """
301 299 Custom engine_from_config functions that makes sure we use NullPool for
302 300 file based sqlite databases. This prevents errors on sqlite. This only
303 301 applies to sqlalchemy versions < 0.7.0
304 302
305 303 """
306 304 import sqlalchemy
307 305 from sqlalchemy import engine_from_config as efc
308 306 import logging
309 307
310 308 if int(sqlalchemy.__version__.split('.')[1]) < 7:
311 309
312 310 # This solution should work for sqlalchemy < 0.7.0, and should use
313 311 # proxy=TimerProxy() for execution time profiling
314 312
315 313 from sqlalchemy.pool import NullPool
316 314 url = configuration[prefix + 'url']
317 315
318 316 if url.startswith('sqlite'):
319 317 kwargs.update({'poolclass': NullPool})
320 318 return efc(configuration, prefix, **kwargs)
321 319 else:
322 320 import time
323 321 from sqlalchemy import event
324 322 from sqlalchemy.engine import Engine
325 323
326 324 log = logging.getLogger('sqlalchemy.engine')
327 325 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38)
328 326 engine = efc(configuration, prefix, **kwargs)
329 327
330 328 def color_sql(sql):
331 329 COLOR_SEQ = "\033[1;%dm"
332 330 COLOR_SQL = YELLOW
333 331 normal = '\x1b[0m'
334 332 return ''.join([COLOR_SEQ % COLOR_SQL, sql, normal])
335 333
336 334 if configuration['debug']:
337 335 #attach events only for debug configuration
338 336
339 337 def before_cursor_execute(conn, cursor, statement,
340 338 parameters, context, executemany):
341 339 context._query_start_time = time.time()
342 340 log.info(color_sql(">>>>> STARTING QUERY >>>>>"))
343 341
344 342 def after_cursor_execute(conn, cursor, statement,
345 343 parameters, context, executemany):
346 344 total = time.time() - context._query_start_time
347 345 log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total))
348 346
349 347 event.listen(engine, "before_cursor_execute",
350 348 before_cursor_execute)
351 349 event.listen(engine, "after_cursor_execute",
352 350 after_cursor_execute)
353 351
354 352 return engine
355 353
356 354
357 355 def age(prevdate, show_short_version=False, now=None):
358 356 """
359 357 turns a datetime into an age string.
360 358 If show_short_version is True, then it will generate a not so accurate but shorter string,
361 359 example: 2days ago, instead of 2 days and 23 hours ago.
362 360
363 361 :param prevdate: datetime object
364 362 :param show_short_version: if it should aproximate the date and return a shorter string
365 363 :rtype: unicode
366 364 :returns: unicode words describing age
367 365 """
368 366 now = now or datetime.datetime.now()
369 367 order = ['year', 'month', 'day', 'hour', 'minute', 'second']
370 368 deltas = {}
371 369 future = False
372 370
373 371 if prevdate > now:
374 372 now, prevdate = prevdate, now
375 373 future = True
376 374 if future:
377 375 prevdate = prevdate.replace(microsecond=0)
378 376 # Get date parts deltas
379 377 from dateutil import relativedelta
380 378 for part in order:
381 379 d = relativedelta.relativedelta(now, prevdate)
382 380 deltas[part] = getattr(d, part + 's')
383 381
384 382 # Fix negative offsets (there is 1 second between 10:59:59 and 11:00:00,
385 383 # not 1 hour, -59 minutes and -59 seconds)
386 384 for num, length in [(5, 60), (4, 60), (3, 24)]: # seconds, minutes, hours
387 385 part = order[num]
388 386 carry_part = order[num - 1]
389 387
390 388 if deltas[part] < 0:
391 389 deltas[part] += length
392 390 deltas[carry_part] -= 1
393 391
394 392 # Same thing for days except that the increment depends on the (variable)
395 393 # number of days in the month
396 394 month_lengths = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
397 395 if deltas['day'] < 0:
398 396 if prevdate.month == 2 and (prevdate.year % 4 == 0 and
399 397 (prevdate.year % 100 != 0 or prevdate.year % 400 == 0)):
400 398 deltas['day'] += 29
401 399 else:
402 400 deltas['day'] += month_lengths[prevdate.month - 1]
403 401
404 402 deltas['month'] -= 1
405 403
406 404 if deltas['month'] < 0:
407 405 deltas['month'] += 12
408 406 deltas['year'] -= 1
409 407
410 408 # Format the result
411 409 fmt_funcs = {
412 410 'year': lambda d: ungettext(u'%d year', '%d years', d) % d,
413 411 'month': lambda d: ungettext(u'%d month', '%d months', d) % d,
414 412 'day': lambda d: ungettext(u'%d day', '%d days', d) % d,
415 413 'hour': lambda d: ungettext(u'%d hour', '%d hours', d) % d,
416 414 'minute': lambda d: ungettext(u'%d minute', '%d minutes', d) % d,
417 415 'second': lambda d: ungettext(u'%d second', '%d seconds', d) % d,
418 416 }
419 417
420 418 for i, part in enumerate(order):
421 419 value = deltas[part]
422 420 if value == 0:
423 421 continue
424 422
425 423 if i < 5:
426 424 sub_part = order[i + 1]
427 425 sub_value = deltas[sub_part]
428 426 else:
429 427 sub_value = 0
430 428
431 429 if sub_value == 0 or show_short_version:
432 430 if future:
433 431 return _(u'in %s') % fmt_funcs[part](value)
434 432 else:
435 433 return _(u'%s ago') % fmt_funcs[part](value)
436 434 if future:
437 435 return _(u'in %s and %s') % (fmt_funcs[part](value),
438 436 fmt_funcs[sub_part](sub_value))
439 437 else:
440 438 return _(u'%s and %s ago') % (fmt_funcs[part](value),
441 439 fmt_funcs[sub_part](sub_value))
442 440
443 441 return _(u'just now')
444 442
445 443
446 444 def uri_filter(uri):
447 445 """
448 446 Removes user:password from given url string
449 447
450 448 :param uri:
451 449 :rtype: unicode
452 450 :returns: filtered list of strings
453 451 """
454 452 if not uri:
455 453 return ''
456 454
457 455 proto = ''
458 456
459 457 for pat in ('https://', 'http://'):
460 458 if uri.startswith(pat):
461 459 uri = uri[len(pat):]
462 460 proto = pat
463 461 break
464 462
465 463 # remove passwords and username
466 464 uri = uri[uri.find('@') + 1:]
467 465
468 466 # get the port
469 467 cred_pos = uri.find(':')
470 468 if cred_pos == -1:
471 469 host, port = uri, None
472 470 else:
473 471 host, port = uri[:cred_pos], uri[cred_pos + 1:]
474 472
475 473 return filter(None, [proto, host, port])
476 474
477 475
478 476 def credentials_filter(uri):
479 477 """
480 478 Returns a url with removed credentials
481 479
482 480 :param uri:
483 481 """
484 482
485 483 uri = uri_filter(uri)
486 484 #check if we have port
487 485 if len(uri) > 2 and uri[2]:
488 486 uri[2] = ':' + uri[2]
489 487
490 488 return ''.join(uri)
491 489
492 490
493 491 def get_changeset_safe(repo, rev):
494 492 """
495 493 Safe version of get_changeset if this changeset doesn't exists for a
496 494 repo it returns a Dummy one instead
497 495
498 496 :param repo:
499 497 :param rev:
500 498 """
501 499 from rhodecode.lib.vcs.backends.base import BaseRepository
502 500 from rhodecode.lib.vcs.exceptions import RepositoryError
503 501 from rhodecode.lib.vcs.backends.base import EmptyChangeset
504 502 if not isinstance(repo, BaseRepository):
505 503 raise Exception('You must pass an Repository '
506 504 'object as first argument got %s', type(repo))
507 505
508 506 try:
509 507 cs = repo.get_changeset(rev)
510 508 except RepositoryError:
511 509 cs = EmptyChangeset(requested_revision=rev)
512 510 return cs
513 511
514 512
515 513 def datetime_to_time(dt):
516 514 if dt:
517 515 return time.mktime(dt.timetuple())
518 516
519 517
520 518 def time_to_datetime(tm):
521 519 if tm:
522 520 if isinstance(tm, basestring):
523 521 try:
524 522 tm = float(tm)
525 523 except ValueError:
526 524 return
527 525 return datetime.datetime.fromtimestamp(tm)
528 526
529 527 MENTIONS_REGEX = r'(?:^@|\s@)([a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+)(?:\s{1})'
530 528
531 529
532 530 def extract_mentioned_users(s):
533 531 """
534 532 Returns unique usernames from given string s that have @mention
535 533
536 534 :param s: string to get mentions
537 535 """
538 536 usrs = set()
539 537 for username in re.findall(MENTIONS_REGEX, s):
540 538 usrs.add(username)
541 539
542 540 return sorted(list(usrs), key=lambda k: k.lower())
543 541
544 542
545 543 class AttributeDict(dict):
546 544 def __getattr__(self, attr):
547 545 return self.get(attr, None)
548 546 __setattr__ = dict.__setitem__
549 547 __delattr__ = dict.__delitem__
550 548
551 549
552 550 def fix_PATH(os_=None):
553 551 """
554 552 Get current active python path, and append it to PATH variable to fix issues
555 553 of subprocess calls and different python versions
556 554 """
557 555 if os_ is None:
558 556 import os
559 557 else:
560 558 os = os_
561 559
562 560 cur_path = os.path.split(sys.executable)[0]
563 561 if not os.environ['PATH'].startswith(cur_path):
564 562 os.environ['PATH'] = '%s:%s' % (cur_path, os.environ['PATH'])
565 563
566 564
567 565 def obfuscate_url_pw(engine):
568 566 _url = engine or ''
569 567 from sqlalchemy.engine import url as sa_url
570 568 try:
571 569 _url = sa_url.make_url(engine)
572 570 if _url.password:
573 571 _url.password = 'XXXXX'
574 572 except Exception:
575 573 pass
576 574 return str(_url)
577 575
578 576
579 577 def get_server_url(environ):
580 578 req = webob.Request(environ)
581 579 return req.host_url + req.script_name
582 580
583 581
584 582 def _extract_extras(env=None):
585 583 """
586 584 Extracts the rc extras data from os.environ, and wraps it into named
587 585 AttributeDict object
588 586 """
589 587 if not env:
590 588 env = os.environ
591 589
592 590 try:
593 591 rc_extras = json.loads(env['RC_SCM_DATA'])
594 592 except Exception:
595 593 print os.environ
596 594 print >> sys.stderr, traceback.format_exc()
597 595 rc_extras = {}
598 596
599 597 try:
600 598 for k in ['username', 'repository', 'locked_by', 'scm', 'make_lock',
601 599 'action', 'ip']:
602 600 rc_extras[k]
603 601 except KeyError, e:
604 602 raise Exception('Missing key %s in os.environ %s' % (e, rc_extras))
605 603
606 604 return AttributeDict(rc_extras)
607 605
608 606
609 607 def _set_extras(extras):
610 608 os.environ['RC_SCM_DATA'] = json.dumps(extras)
611 609
612 610
613 611 def unique_id(hexlen=32):
614 612 alphabet = "23456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghjklmnpqrstuvwxyz"
615 613 return suuid(truncate_to=hexlen, alphabet=alphabet)
616 614
617 615
618 616 def suuid(url=None, truncate_to=22, alphabet=None):
619 617 """
620 618 Generate and return a short URL safe UUID.
621 619
622 620 If the url parameter is provided, set the namespace to the provided
623 621 URL and generate a UUID.
624 622
625 623 :param url to get the uuid for
626 624 :truncate_to: truncate the basic 22 UUID to shorter version
627 625
628 626 The IDs won't be universally unique any longer, but the probability of
629 627 a collision will still be very low.
630 628 """
631 629 # Define our alphabet.
632 630 _ALPHABET = alphabet or "23456789ABCDEFGHJKLMNPQRSTUVWXYZ"
633 631
634 632 # If no URL is given, generate a random UUID.
635 633 if url is None:
636 634 unique_id = uuid.uuid4().int
637 635 else:
638 636 unique_id = uuid.uuid3(uuid.NAMESPACE_URL, url).int
639 637
640 638 alphabet_length = len(_ALPHABET)
641 639 output = []
642 640 while unique_id > 0:
643 641 digit = unique_id % alphabet_length
644 642 output.append(_ALPHABET[digit])
645 643 unique_id = int(unique_id / alphabet_length)
646 644 return "".join(output)[:truncate_to]
@@ -1,190 +1,188 b''
1 1 """
2 2 This module provides some useful tools for ``vcs`` like annotate/diff html
3 3 output. It also includes some internal helpers.
4 4 """
5 5 import sys
6 6 import time
7 7 import datetime
8 8
9 9
10 10 def makedate():
11 11 lt = time.localtime()
12 12 if lt[8] == 1 and time.daylight:
13 13 tz = time.altzone
14 14 else:
15 15 tz = time.timezone
16 16 return time.mktime(lt), tz
17 17
18 18
19 19 def aslist(obj, sep=None, strip=True):
20 20 """
21 21 Returns given string separated by sep as list
22 22
23 23 :param obj:
24 24 :param sep:
25 25 :param strip:
26 26 """
27 27 if isinstance(obj, (basestring)):
28 28 lst = obj.split(sep)
29 29 if strip:
30 30 lst = [v.strip() for v in lst]
31 31 return lst
32 32 elif isinstance(obj, (list, tuple)):
33 33 return obj
34 34 elif obj is None:
35 35 return []
36 36 else:
37 37 return [obj]
38 38
39 39
40 40 def date_fromtimestamp(unixts, tzoffset=0):
41 41 """
42 42 Makes a local datetime object out of unix timestamp
43 43
44 44 :param unixts:
45 45 :param tzoffset:
46 46 """
47 47
48 48 return datetime.datetime.fromtimestamp(float(unixts))
49 49
50 50
51 51 def safe_int(val, default=None):
52 52 """
53 53 Returns int() of val if val is not convertable to int use default
54 54 instead
55 55
56 56 :param val:
57 57 :param default:
58 58 """
59 59
60 60 try:
61 61 val = int(val)
62 62 except (ValueError, TypeError):
63 63 val = default
64 64
65 65 return val
66 66
67 67
68 68 def safe_unicode(str_, from_encoding=None):
69 69 """
70 70 safe unicode function. Does few trick to turn str_ into unicode
71 71
72 72 In case of UnicodeDecode error we try to return it with encoding detected
73 73 by chardet library if it fails fallback to unicode with errors replaced
74 74
75 75 :param str_: string to decode
76 76 :rtype: unicode
77 77 :returns: unicode object
78 78 """
79 79 if isinstance(str_, unicode):
80 80 return str_
81 81
82 82 if not from_encoding:
83 83 from rhodecode.lib.vcs.conf import settings
84 84 from_encoding = settings.DEFAULT_ENCODINGS
85 85
86 86 if not isinstance(from_encoding, (list, tuple)):
87 87 from_encoding = [from_encoding]
88 88
89 89 try:
90 90 return unicode(str_)
91 91 except UnicodeDecodeError:
92 92 pass
93 93
94 94 for enc in from_encoding:
95 95 try:
96 96 return unicode(str_, enc)
97 97 except UnicodeDecodeError:
98 98 pass
99 99
100 100 try:
101 101 import chardet
102 102 encoding = chardet.detect(str_)['encoding']
103 103 if encoding is None:
104 104 raise Exception()
105 105 return str_.decode(encoding)
106 106 except (ImportError, UnicodeDecodeError, Exception):
107 107 return unicode(str_, from_encoding[0], 'replace')
108 108
109 109
110 110 def safe_str(unicode_, to_encoding=None):
111 111 """
112 112 safe str function. Does few trick to turn unicode_ into string
113 113
114 114 In case of UnicodeEncodeError we try to return it with encoding detected
115 115 by chardet library if it fails fallback to string with errors replaced
116 116
117 117 :param unicode_: unicode to encode
118 118 :rtype: str
119 119 :returns: str object
120 120 """
121 121
122 122 # if it's not basestr cast to str
123 123 if not isinstance(unicode_, basestring):
124 124 return str(unicode_)
125 125
126 126 if isinstance(unicode_, str):
127 127 return unicode_
128 128
129 129 if not to_encoding:
130 130 from rhodecode.lib.vcs.conf import settings
131 131 to_encoding = settings.DEFAULT_ENCODINGS
132 132
133 133 if not isinstance(to_encoding, (list, tuple)):
134 134 to_encoding = [to_encoding]
135 135
136 136 for enc in to_encoding:
137 137 try:
138 138 return unicode_.encode(enc)
139 139 except UnicodeEncodeError:
140 140 pass
141 141
142 142 try:
143 143 import chardet
144 144 encoding = chardet.detect(unicode_)['encoding']
145 145 if encoding is None:
146 146 raise UnicodeEncodeError()
147 147
148 148 return unicode_.encode(encoding)
149 149 except (ImportError, UnicodeEncodeError):
150 150 return unicode_.encode(to_encoding[0], 'replace')
151 151
152 return safe_str
153
154 152
155 153 def author_email(author):
156 154 """
157 155 returns email address of given author.
158 156 If any of <,> sign are found, it fallbacks to regex findall()
159 157 and returns first found result or empty string
160 158
161 159 Regex taken from http://www.regular-expressions.info/email.html
162 160 """
163 161 import re
164 162 r = author.find('>')
165 163 l = author.find('<')
166 164
167 165 if l == -1 or r == -1:
168 166 # fallback to regex match of email out of a string
169 167 email_re = re.compile(r"""[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!"""
170 168 r"""#$%&'*+/=?^_`{|}~-]+)*@(?:[a-z0-9](?:[a-z"""
171 169 r"""0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]"""
172 170 r"""*[a-z0-9])?""", re.IGNORECASE)
173 171 m = re.findall(email_re, author)
174 172 return m[0] if m else ''
175 173
176 174 return author[l + 1:r].strip()
177 175
178 176
179 177 def author_name(author):
180 178 """
181 179 get name of author, or else username.
182 180 It'll try to find an email in the author string and just cut it off
183 181 to get the username
184 182 """
185 183
186 184 if not '@' in author:
187 185 return author
188 186 else:
189 187 return author.replace(author_email(author), '').replace('<', '')\
190 188 .replace('>', '').strip()
@@ -1,221 +1,220 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.tests.test_hg_operations
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 Test suite for making push/pull operations
7 7
8 8 :created_on: Dec 30, 2010
9 9 :author: marcink
10 10 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25
26 26 import os
27 27 import sys
28 28 import shutil
29 29 import logging
30 30 from os.path import join as jn
31 31 from os.path import dirname as dn
32 32
33 33 from tempfile import _RandomNameSequence
34 34 from subprocess import Popen, PIPE
35 35
36 36 from paste.deploy import appconfig
37 37 from pylons import config
38 38 from sqlalchemy import engine_from_config
39 39
40 40 from rhodecode.lib.utils import add_cache
41 41 from rhodecode.model import init_model
42 42 from rhodecode.model import meta
43 43 from rhodecode.model.db import User, Repository
44 44 from rhodecode.lib.auth import get_crypt_password
45 45
46 46 from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
47 47 from rhodecode.config.environment import load_environment
48 48
49 49 rel_path = dn(dn(dn(dn(os.path.abspath(__file__)))))
50 50 conf = appconfig('config:rc.ini', relative_to=rel_path)
51 51 load_environment(conf.global_conf, conf.local_conf)
52 52
53 53 add_cache(conf)
54 54
55 55 USER = 'test_admin'
56 56 PASS = 'test12'
57 57 HOST = 'rc.local'
58 58 METHOD = 'pull'
59 59 DEBUG = True
60 60 log = logging.getLogger(__name__)
61 61
62 62
63 63 class Command(object):
64 64
65 65 def __init__(self, cwd):
66 66 self.cwd = cwd
67 67
68 68 def execute(self, cmd, *args):
69 69 """Runs command on the system with given ``args``.
70 70 """
71 71
72 72 command = cmd + ' ' + ' '.join(args)
73 73 log.debug('Executing %s' % command)
74 74 if DEBUG:
75 75 print command
76 76 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
77 77 stdout, stderr = p.communicate()
78 78 if DEBUG:
79 79 print stdout, stderr
80 80 return stdout, stderr
81 81
82 82
83 83 def get_session():
84 84 engine = engine_from_config(conf, 'sqlalchemy.db1.')
85 85 init_model(engine)
86 86 sa = meta.Session
87 87 return sa
88 88
89 89
90 90 def create_test_user(force=True):
91 91 print 'creating test user'
92 92 sa = get_session()
93 93
94 94 user = sa.query(User).filter(User.username == USER).scalar()
95 95
96 96 if force and user is not None:
97 97 print 'removing current user'
98 98 for repo in sa.query(Repository).filter(Repository.user == user).all():
99 99 sa.delete(repo)
100 100 sa.delete(user)
101 101 sa.commit()
102 102
103 103 if user is None or force:
104 104 print 'creating new one'
105 105 new_usr = User()
106 106 new_usr.username = USER
107 107 new_usr.password = get_crypt_password(PASS)
108 108 new_usr.email = 'mail@mail.com'
109 109 new_usr.name = 'test'
110 110 new_usr.lastname = 'lasttestname'
111 111 new_usr.active = True
112 112 new_usr.admin = True
113 113 sa.add(new_usr)
114 114 sa.commit()
115 115
116 116 print 'done'
117 117
118 118
119 119 def create_test_repo(force=True):
120 120 print 'creating test repo'
121 121 from rhodecode.model.repo import RepoModel
122 122 sa = get_session()
123 123
124 124 user = sa.query(User).filter(User.username == USER).scalar()
125 125 if user is None:
126 126 raise Exception('user not found')
127 127
128 128 repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
129 129
130 130 if repo is None:
131 131 print 'repo not found creating'
132 132
133 133 form_data = {'repo_name': HG_REPO,
134 134 'repo_type': 'hg',
135 135 'private':False,
136 136 'clone_uri': '' }
137 137 rm = RepoModel(sa)
138 138 rm.base_path = '/home/hg'
139 139 rm.create(form_data, user)
140 140
141 141 print 'done'
142 142
143 143
144 144 def set_anonymous_access(enable=True):
145 145 sa = get_session()
146 146 user = sa.query(User).filter(User.username == 'default').one()
147 147 user.active = enable
148 148 sa.add(user)
149 149 sa.commit()
150 150
151 151
152 152 def get_anonymous_access():
153 153 sa = get_session()
154 154 return sa.query(User).filter(User.username == 'default').one().active
155 155
156 156
157 157 #==============================================================================
158 158 # TESTS
159 159 #==============================================================================
160 160 def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD,
161 161 seq=None, backend='hg'):
162 162 cwd = path = jn(TESTS_TMP_PATH, repo)
163 163
164 164 if seq == None:
165 165 seq = _RandomNameSequence().next()
166 166
167 167 try:
168 168 shutil.rmtree(path, ignore_errors=True)
169 169 os.makedirs(path)
170 170 #print 'made dirs %s' % jn(path)
171 171 except OSError:
172 172 raise
173 173
174 174 clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
175 175 {'user': USER,
176 176 'pass': PASS,
177 177 'host': HOST,
178 178 'cloned_repo': repo, }
179 179
180 180 dest = path + seq
181 181 if method == 'pull':
182 182 stdout, stderr = Command(cwd).execute(backend, method, '--cwd', dest, clone_url)
183 183 else:
184 184 stdout, stderr = Command(cwd).execute(backend, method, clone_url, dest)
185 185 print stdout,'sdasdsadsa'
186 186 if not no_errors:
187 187 if backend == 'hg':
188 188 assert """adding file changes""" in stdout, 'no messages about cloning'
189 189 assert """abort""" not in stderr , 'got error from clone'
190 190 elif backend == 'git':
191 191 assert """Cloning into""" in stdout, 'no messages about cloning'
192 192
193 193 if __name__ == '__main__':
194 194 try:
195 195 create_test_user(force=False)
196 196 seq = None
197 197 import time
198 198
199 199 try:
200 200 METHOD = sys.argv[3]
201 201 except Exception:
202 202 pass
203 203
204 204 try:
205 205 backend = sys.argv[4]
206 206 except Exception:
207 207 backend = 'hg'
208 208
209 209 if METHOD == 'pull':
210 210 seq = _RandomNameSequence().next()
211 211 test_clone_with_credentials(repo=sys.argv[1], method='clone',
212 212 seq=seq, backend=backend)
213 213 s = time.time()
214 214 for i in range(1, int(sys.argv[2]) + 1):
215 215 print 'take', i
216 216 test_clone_with_credentials(repo=sys.argv[1], method=METHOD,
217 217 seq=seq, backend=backend)
218 218 print 'time taken %.3f' % (time.time() - s)
219 219 except Exception, e:
220 raise
221 220 sys.exit('stop on %s' % e)
General Comments 0
You need to be logged in to leave comments. Login now