##// END OF EJS Templates
Implemented locking for task, to prevent for running the same tasks,...
marcink -
r497:fb0c3af6 celery
parent child Browse files
Show More
@@ -1,9 +1,11 b''
1 from pylons_app.lib.pidlock import DaemonLock, LockHeld
1 2 from vcs.utils.lazy import LazyProperty
3 from decorator import decorator
2 4 import logging
3 5 import os
4 6 import sys
5 7 import traceback
6
8 from hashlib import md5
7 9 log = logging.getLogger(__name__)
8 10
9 11 class ResultWrapper(object):
@@ -20,10 +22,45 b' def run_task(task, *args, **kwargs):'
20 22 log.info('running task %s', t.task_id)
21 23 return t
22 24 except Exception, e:
25 print e
23 26 if e.errno == 111:
24 27 log.debug('Unnable to connect. Sync execution')
25 28 else:
26 29 log.error(traceback.format_exc())
27 30 #pure sync version
28 31 return ResultWrapper(task(*args, **kwargs))
32
33
34 class LockTask(object):
35 """LockTask decorator"""
29 36
37 def __init__(self, func):
38 self.func = func
39
40 def __call__(self, func):
41 return decorator(self.__wrapper, func)
42
43 def __wrapper(self, func, *fargs, **fkwargs):
44 params = []
45 params.extend(fargs)
46 params.extend(fkwargs.values())
47 lockkey = 'task_%s' % \
48 md5(str(self.func) + '-' + '-'.join(map(str, params))).hexdigest()
49 log.info('running task with lockkey %s', lockkey)
50 try:
51 l = DaemonLock(lockkey)
52 return func(*fargs, **fkwargs)
53 l.release()
54 except LockHeld:
55 log.info('LockHeld')
56 return 'Task with key %s already running' % lockkey
57
58
59
60
61
62
63
64
65
66
@@ -2,7 +2,7 b' from celery.decorators import task'
2 2 from celery.task.sets import subtask
3 3 from celeryconfig import PYLONS_CONFIG as config
4 4 from pylons.i18n.translation import _
5 from pylons_app.lib.celerylib import run_task
5 from pylons_app.lib.celerylib import run_task, LockTask
6 6 from pylons_app.lib.helpers import person
7 7 from pylons_app.lib.smtp_mailer import SmtpMailer
8 8 from pylons_app.lib.utils import OrderedDict
@@ -68,7 +68,7 b' def get_hg_ui_settings():'
68 68 @task
69 69 def whoosh_index(repo_location, full_index):
70 70 log = whoosh_index.get_logger()
71 from pylons_app.lib.indexers import DaemonLock
71 from pylons_app.lib.pidlock import DaemonLock
72 72 from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon, LockHeld
73 73 try:
74 74 l = DaemonLock()
@@ -80,7 +80,9 b' def whoosh_index(repo_location, full_ind'
80 80 log.info('LockHeld')
81 81 return 'LockHeld'
82 82
83
83 84 @task
85 @LockTask('get_commits_stats')
84 86 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
85 87 author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
86 88
@@ -92,7 +94,7 b' def get_commits_stats(repo_name, ts_min_'
92 94 repo = MercurialRepository(repos_path + repo_name)
93 95
94 96 skip_date_limit = True
95 parse_limit = 500 #limit for single task changeset parsing
97 parse_limit = 350 #limit for single task changeset parsing
96 98 last_rev = 0
97 99 last_cs = None
98 100 timegetter = itemgetter('time')
@@ -205,7 +207,9 b' def get_commits_stats(repo_name, ts_min_'
205 207 log.error(traceback.format_exc())
206 208 sa.rollback()
207 209 return False
208
210
211 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
212
209 213 return True
210 214
211 215 @task
@@ -1,5 +1,4 b''
1 1 from os.path import dirname as dn, join as jn
2 from pidlock import LockHeld, DaemonLock
3 2 from pylons_app.config.environment import load_environment
4 3 from pylons_app.model.hg_model import HgModel
5 4 from shutil import rmtree
@@ -32,7 +32,7 b' from os.path import join as jn'
32 32 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
33 33 sys.path.append(project_path)
34 34
35 from pidlock import LockHeld, DaemonLock
35 from pylons_app.lib.pidlock import LockHeld, DaemonLock
36 36 from pylons_app.model.hg_model import HgModel
37 37 from pylons_app.lib.helpers import safe_unicode
38 38 from whoosh.index import create_in, open_dir
@@ -6,7 +6,7 b' class LockHeld(Exception):pass'
6 6
7 7
8 8 class DaemonLock(object):
9 '''daemon locking
9 """daemon locking
10 10 USAGE:
11 11 try:
12 12 l = lock()
@@ -14,7 +14,7 b' class DaemonLock(object):'
14 14 l.release()
15 15 except LockHeld:
16 16 sys.exit(1)
17 '''
17 """
18 18
19 19 def __init__(self, file=None, callbackfn=None,
20 20 desc='daemon lock', debug=False):
@@ -40,9 +40,9 b' class DaemonLock(object):'
40 40
41 41
42 42 def lock(self):
43 '''
43 """
44 44 locking function, if lock is present it will raise LockHeld exception
45 '''
45 """
46 46 lockname = '%s' % (os.getpid())
47 47
48 48 self.trylock()
@@ -75,9 +75,9 b' class DaemonLock(object):'
75 75
76 76
77 77 def release(self):
78 '''
78 """
79 79 releases the pid by removing the pidfile
80 '''
80 """
81 81 if self.callbackfn:
82 82 #execute callback function on release
83 83 if self.debug:
@@ -94,11 +94,11 b' class DaemonLock(object):'
94 94 pass
95 95
96 96 def makelock(self, lockname, pidfile):
97 '''
97 """
98 98 this function will make an actual lock
99 99 @param lockname: acctual pid of file
100 100 @param pidfile: the file to write the pid in
101 '''
101 """
102 102 if self.debug:
103 103 print 'creating a file %s and pid: %s' % (pidfile, lockname)
104 104 pidfile = open(self.pidfile, "wb")
@@ -374,9 +374,8 b' def create_test_index(repo_location, ful'
374 374 @param repo_location:
375 375 @param full_index:
376 376 """
377 from pylons_app.lib.indexers import daemon
378 377 from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon
379 from pylons_app.lib.indexers.pidlock import DaemonLock, LockHeld
378 from pylons_app.lib.pidlock import DaemonLock, LockHeld
380 379 from pylons_app.lib.indexers import IDX_LOCATION
381 380 import shutil
382 381
@@ -29,26 +29,27 b''
29 29 <th>${_('Last commiter')}</th>
30 30 </tr>
31 31 </thead>
32 <tr class="parity0">
33 <td>
34 % if c.files_list.parent:
35 ${h.link_to('..',h.url('files_home',repo_name=c.repo_name,revision=c.cur_rev,f_path=c.files_list.parent.path),class_="browser-dir")}
36 %endif
37 </td>
38 <td></td>
39 <td></td>
40 <td></td>
41 <td></td>
42 </tr>
32
33 % if c.files_list.parent:
34 <tr class="parity0">
35 <td>
36 ${h.link_to('..',h.url('files_home',repo_name=c.repo_name,revision=c.cur_rev,f_path=c.files_list.parent.path),class_="browser-dir")}
37 </td>
38 <td></td>
39 <td></td>
40 <td></td>
41 <td></td>
42 <td></td>
43 </tr>
44 %endif
45
43 46 %for cnt,node in enumerate(c.files_list,1):
44 47 <tr class="parity${cnt%2}">
45 48 <td>
46 49 ${h.link_to(node.name,h.url('files_home',repo_name=c.repo_name,revision=c.cur_rev,f_path=node.path),class_=file_class(node))}
47 50 </td>
48 51 <td>
49 %if node.is_file():
50 ${h.format_byte_size(node.size,binary=True)}
51 %endif
52 ${h.format_byte_size(node.size,binary=True)}
52 53 </td>
53 54 <td>
54 55 %if node.is_file():
General Comments 0
You need to be logged in to leave comments. Login now