##// END OF EJS Templates
Fixed problems with unicode cache keys in celery
marcink -
r1641:cd1c21af default
parent child Browse files
Show More
@@ -1,407 +1,410 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.__init__
4 4 ~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 Some simple helper functions
7 7
8 8 :created_on: Jan 5, 2011
9 9 :author: marcink
10 10 :copyright: (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25
26 26 import os
27 27
28 28 def __get_lem():
29 29 from pygments import lexers
30 30 from string import lower
31 31 from collections import defaultdict
32 32
33 33 d = defaultdict(lambda: [])
34 34
35 35 def __clean(s):
36 36 s = s.lstrip('*')
37 37 s = s.lstrip('.')
38 38
39 39 if s.find('[') != -1:
40 40 exts = []
41 41 start, stop = s.find('['), s.find(']')
42 42
43 43 for suffix in s[start + 1:stop]:
44 44 exts.append(s[:s.find('[')] + suffix)
45 45 return map(lower, exts)
46 46 else:
47 47 return map(lower, [s])
48 48
49 49 for lx, t in sorted(lexers.LEXERS.items()):
50 50 m = map(__clean, t[-2])
51 51 if m:
52 52 m = reduce(lambda x, y: x + y, m)
53 53 for ext in m:
54 54 desc = lx.replace('Lexer', '')
55 55 d[ext].append(desc)
56 56
57 57 return dict(d)
58 58
59 59 # language map is also used by whoosh indexer, which for those specified
60 60 # extensions will index it's content
61 61 LANGUAGES_EXTENSIONS_MAP = __get_lem()
62 62
63 63 # Additional mappings that are not present in the pygments lexers
64 64 # NOTE: that this will overide any mappings in LANGUAGES_EXTENSIONS_MAP
65 65 ADDITIONAL_MAPPINGS = {'xaml': 'XAML'}
66 66
67 67 LANGUAGES_EXTENSIONS_MAP.update(ADDITIONAL_MAPPINGS)
68 68
69 69
70 70 def str2bool(_str):
71 71 """
72 72 returs True/False value from given string, it tries to translate the
73 73 string into boolean
74 74
75 75 :param _str: string value to translate into boolean
76 76 :rtype: boolean
77 77 :returns: boolean from given string
78 78 """
79 79 if _str is None:
80 80 return False
81 81 if _str in (True, False):
82 82 return _str
83 83 _str = str(_str).strip().lower()
84 84 return _str in ('t', 'true', 'y', 'yes', 'on', '1')
85 85
86 86
87 87 def convert_line_endings(line, mode):
88 88 """
89 89 Converts a given line "line end" accordingly to given mode
90 90
91 91 Available modes are::
92 92 0 - Unix
93 93 1 - Mac
94 94 2 - DOS
95 95
96 96 :param line: given line to convert
97 97 :param mode: mode to convert to
98 98 :rtype: str
99 99 :return: converted line according to mode
100 100 """
101 101 from string import replace
102 102
103 103 if mode == 0:
104 104 line = replace(line, '\r\n', '\n')
105 105 line = replace(line, '\r', '\n')
106 106 elif mode == 1:
107 107 line = replace(line, '\r\n', '\r')
108 108 line = replace(line, '\n', '\r')
109 109 elif mode == 2:
110 110 import re
111 111 line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line)
112 112 return line
113 113
114 114
115 115 def detect_mode(line, default):
116 116 """
117 117 Detects line break for given line, if line break couldn't be found
118 118 given default value is returned
119 119
120 120 :param line: str line
121 121 :param default: default
122 122 :rtype: int
123 123 :return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS
124 124 """
125 125 if line.endswith('\r\n'):
126 126 return 2
127 127 elif line.endswith('\n'):
128 128 return 0
129 129 elif line.endswith('\r'):
130 130 return 1
131 131 else:
132 132 return default
133 133
134 134
135 135 def generate_api_key(username, salt=None):
136 136 """
137 137 Generates unique API key for given username, if salt is not given
138 138 it'll be generated from some random string
139 139
140 140 :param username: username as string
141 141 :param salt: salt to hash generate KEY
142 142 :rtype: str
143 143 :returns: sha1 hash from username+salt
144 144 """
145 145 from tempfile import _RandomNameSequence
146 146 import hashlib
147 147
148 148 if salt is None:
149 149 salt = _RandomNameSequence().next()
150 150
151 151 return hashlib.sha1(username + salt).hexdigest()
152 152
153 153
154 154 def safe_unicode(str_, from_encoding='utf8'):
155 155 """
156 156 safe unicode function. Does few trick to turn str_ into unicode
157 157
158 158 In case of UnicodeDecode error we try to return it with encoding detected
159 159 by chardet library if it fails fallback to unicode with errors replaced
160 160
161 161 :param str_: string to decode
162 162 :rtype: unicode
163 163 :returns: unicode object
164 164 """
165 165 if isinstance(str_, unicode):
166 166 return str_
167 167
168 168 try:
169 169 return unicode(str_)
170 170 except UnicodeDecodeError:
171 171 pass
172 172
173 173 try:
174 174 return unicode(str_, from_encoding)
175 175 except UnicodeDecodeError:
176 176 pass
177 177
178 178 try:
179 179 import chardet
180 180 encoding = chardet.detect(str_)['encoding']
181 181 if encoding is None:
182 182 raise Exception()
183 183 return str_.decode(encoding)
184 184 except (ImportError, UnicodeDecodeError, Exception):
185 185 return unicode(str_, from_encoding, 'replace')
186 186
187 187 def safe_str(unicode_, to_encoding='utf8'):
188 188 """
189 189 safe str function. Does few trick to turn unicode_ into string
190 190
191 191 In case of UnicodeEncodeError we try to return it with encoding detected
192 192 by chardet library if it fails fallback to string with errors replaced
193 193
194 194 :param unicode_: unicode to encode
195 195 :rtype: str
196 196 :returns: str object
197 197 """
198
199 if not isinstance(unicode_, basestring):
200 return str(unicode_)
198 201
199 202 if isinstance(unicode_, str):
200 203 return unicode_
201 204
202 205 try:
203 206 return unicode_.encode(to_encoding)
204 207 except UnicodeEncodeError:
205 208 pass
206 209
207 210 try:
208 211 import chardet
209 212 encoding = chardet.detect(unicode_)['encoding']
210 213 print encoding
211 214 if encoding is None:
212 215 raise UnicodeEncodeError()
213 216
214 217 return unicode_.encode(encoding)
215 218 except (ImportError, UnicodeEncodeError):
216 219 return unicode_.encode(to_encoding, 'replace')
217 220
218 221 return safe_str
219 222
220 223
221 224
222 225 def engine_from_config(configuration, prefix='sqlalchemy.', **kwargs):
223 226 """
224 227 Custom engine_from_config functions that makes sure we use NullPool for
225 228 file based sqlite databases. This prevents errors on sqlite. This only
226 229 applies to sqlalchemy versions < 0.7.0
227 230
228 231 """
229 232 import sqlalchemy
230 233 from sqlalchemy import engine_from_config as efc
231 234 import logging
232 235
233 236 if int(sqlalchemy.__version__.split('.')[1]) < 7:
234 237
235 238 # This solution should work for sqlalchemy < 0.7.0, and should use
236 239 # proxy=TimerProxy() for execution time profiling
237 240
238 241 from sqlalchemy.pool import NullPool
239 242 url = configuration[prefix + 'url']
240 243
241 244 if url.startswith('sqlite'):
242 245 kwargs.update({'poolclass': NullPool})
243 246 return efc(configuration, prefix, **kwargs)
244 247 else:
245 248 import time
246 249 from sqlalchemy import event
247 250 from sqlalchemy.engine import Engine
248 251
249 252 log = logging.getLogger('sqlalchemy.engine')
250 253 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38)
251 254 engine = efc(configuration, prefix, **kwargs)
252 255
253 256 def color_sql(sql):
254 257 COLOR_SEQ = "\033[1;%dm"
255 258 COLOR_SQL = YELLOW
256 259 normal = '\x1b[0m'
257 260 return ''.join([COLOR_SEQ % COLOR_SQL, sql, normal])
258 261
259 262 if configuration['debug']:
260 263 #attach events only for debug configuration
261 264
262 265 def before_cursor_execute(conn, cursor, statement,
263 266 parameters, context, executemany):
264 267 context._query_start_time = time.time()
265 268 log.info(color_sql(">>>>> STARTING QUERY >>>>>"))
266 269
267 270
268 271 def after_cursor_execute(conn, cursor, statement,
269 272 parameters, context, executemany):
270 273 total = time.time() - context._query_start_time
271 274 log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total))
272 275
273 276 event.listen(engine, "before_cursor_execute",
274 277 before_cursor_execute)
275 278 event.listen(engine, "after_cursor_execute",
276 279 after_cursor_execute)
277 280
278 281 return engine
279 282
280 283
281 284 def age(curdate):
282 285 """
283 286 turns a datetime into an age string.
284 287
285 288 :param curdate: datetime object
286 289 :rtype: unicode
287 290 :returns: unicode words describing age
288 291 """
289 292
290 293 from datetime import datetime
291 294 from webhelpers.date import time_ago_in_words
292 295
293 296 _ = lambda s:s
294 297
295 298 if not curdate:
296 299 return ''
297 300
298 301 agescales = [(_(u"year"), 3600 * 24 * 365),
299 302 (_(u"month"), 3600 * 24 * 30),
300 303 (_(u"day"), 3600 * 24),
301 304 (_(u"hour"), 3600),
302 305 (_(u"minute"), 60),
303 306 (_(u"second"), 1), ]
304 307
305 308 age = datetime.now() - curdate
306 309 age_seconds = (age.days * agescales[2][1]) + age.seconds
307 310 pos = 1
308 311 for scale in agescales:
309 312 if scale[1] <= age_seconds:
310 313 if pos == 6:pos = 5
311 314 return '%s %s' % (time_ago_in_words(curdate,
312 315 agescales[pos][0]), _('ago'))
313 316 pos += 1
314 317
315 318 return _(u'just now')
316 319
317 320
318 321 def uri_filter(uri):
319 322 """
320 323 Removes user:password from given url string
321 324
322 325 :param uri:
323 326 :rtype: unicode
324 327 :returns: filtered list of strings
325 328 """
326 329 if not uri:
327 330 return ''
328 331
329 332 proto = ''
330 333
331 334 for pat in ('https://', 'http://'):
332 335 if uri.startswith(pat):
333 336 uri = uri[len(pat):]
334 337 proto = pat
335 338 break
336 339
337 340 # remove passwords and username
338 341 uri = uri[uri.find('@') + 1:]
339 342
340 343 # get the port
341 344 cred_pos = uri.find(':')
342 345 if cred_pos == -1:
343 346 host, port = uri, None
344 347 else:
345 348 host, port = uri[:cred_pos], uri[cred_pos + 1:]
346 349
347 350 return filter(None, [proto, host, port])
348 351
349 352
350 353 def credentials_filter(uri):
351 354 """
352 355 Returns a url with removed credentials
353 356
354 357 :param uri:
355 358 """
356 359
357 360 uri = uri_filter(uri)
358 361 #check if we have port
359 362 if len(uri) > 2 and uri[2]:
360 363 uri[2] = ':' + uri[2]
361 364
362 365 return ''.join(uri)
363 366
364 367 def get_changeset_safe(repo, rev):
365 368 """
366 369 Safe version of get_changeset if this changeset doesn't exists for a
367 370 repo it returns a Dummy one instead
368 371
369 372 :param repo:
370 373 :param rev:
371 374 """
372 375 from vcs.backends.base import BaseRepository
373 376 from vcs.exceptions import RepositoryError
374 377 if not isinstance(repo, BaseRepository):
375 378 raise Exception('You must pass an Repository '
376 379 'object as first argument got %s', type(repo))
377 380
378 381 try:
379 382 cs = repo.get_changeset(rev)
380 383 except RepositoryError:
381 384 from rhodecode.lib.utils import EmptyChangeset
382 385 cs = EmptyChangeset(requested_revision=rev)
383 386 return cs
384 387
385 388
386 389 def get_current_revision(quiet=False):
387 390 """
388 391 Returns tuple of (number, id) from repository containing this package
389 392 or None if repository could not be found.
390 393
391 394 :param quiet: prints error for fetching revision if True
392 395 """
393 396
394 397 try:
395 398 from vcs import get_repo
396 399 from vcs.utils.helpers import get_scm
397 400 repopath = os.path.join(os.path.dirname(__file__), '..', '..')
398 401 scm = get_scm(repopath)[0]
399 402 repo = get_repo(path=repopath, alias=scm)
400 403 tip = repo.get_changeset()
401 404 return (tip.revision, tip.short_id)
402 405 except Exception, err:
403 406 if not quiet:
404 407 print ("Cannot retrieve rhodecode's revision. Original error "
405 408 "was: %s" % err)
406 409 return None
407 410
@@ -1,109 +1,109 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.lib.celerylib.__init__
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 celery libs for RhodeCode
7 7
8 8 :created_on: Nov 27, 2010
9 9 :author: marcink
10 10 :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25
26 26 import os
27 27 import sys
28 28 import socket
29 29 import traceback
30 30 import logging
31 31 from os.path import dirname as dn, join as jn
32 32
33 33 from hashlib import md5
34 34 from decorator import decorator
35 35 from pylons import config
36 36
37 37 from vcs.utils.lazy import LazyProperty
38 38
39 from rhodecode.lib import str2bool
39 from rhodecode.lib import str2bool, safe_str
40 40 from rhodecode.lib.pidlock import DaemonLock, LockHeld
41 41
42 42 from celery.messaging import establish_connection
43 43
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47 try:
48 48 CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
49 49 except KeyError:
50 50 CELERY_ON = False
51 51
52 52
53 53 class ResultWrapper(object):
54 54 def __init__(self, task):
55 55 self.task = task
56 56
57 57 @LazyProperty
58 58 def result(self):
59 59 return self.task
60 60
61 61
62 62 def run_task(task, *args, **kwargs):
63 63 if CELERY_ON:
64 64 try:
65 65 t = task.apply_async(args=args, kwargs=kwargs)
66 66 log.info('running task %s:%s', t.task_id, task)
67 67 return t
68 68
69 69 except socket.error, e:
70 70 if isinstance(e, IOError) and e.errno == 111:
71 71 log.debug('Unable to connect to celeryd. Sync execution')
72 72 else:
73 73 log.error(traceback.format_exc())
74 74 except KeyError, e:
75 75 log.debug('Unable to connect to celeryd. Sync execution')
76 76 except Exception, e:
77 77 log.error(traceback.format_exc())
78 78
79 79 log.debug('executing task %s in sync mode', task)
80 80 return ResultWrapper(task(*args, **kwargs))
81 81
82 82
83 83 def __get_lockkey(func, *fargs, **fkwargs):
84 84 params = list(fargs)
85 85 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
86 86
87 87 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
88 88
89 89 lockkey = 'task_%s.lock' % \
90 md5(func_name + '-' + '-'.join(map(str, params))).hexdigest()
90 md5(func_name + '-' + '-'.join(map(safe_str, params))).hexdigest()
91 91 return lockkey
92 92
93 93
94 94 def locked_task(func):
95 95 def __wrapper(func, *fargs, **fkwargs):
96 96 lockkey = __get_lockkey(func, *fargs, **fkwargs)
97 97 lockkey_path = config['here']
98 98
99 99 log.info('running task with lockkey %s', lockkey)
100 100 try:
101 101 l = DaemonLock(file_=jn(lockkey_path, lockkey))
102 102 ret = func(*fargs, **fkwargs)
103 103 l.release()
104 104 return ret
105 105 except LockHeld:
106 106 log.info('LockHeld')
107 107 return 'Task with key %s already running' % lockkey
108 108
109 109 return decorator(__wrapper, func)
General Comments 0
You need to be logged in to leave comments. Login now