##// END OF EJS Templates
Implemented locking for task, to prevent for running the same tasks,...
marcink -
r497:fb0c3af6 celery
parent child Browse files
Show More
@@ -1,9 +1,11 b''
1 from pylons_app.lib.pidlock import DaemonLock, LockHeld
1 from vcs.utils.lazy import LazyProperty
2 from vcs.utils.lazy import LazyProperty
3 from decorator import decorator
2 import logging
4 import logging
3 import os
5 import os
4 import sys
6 import sys
5 import traceback
7 import traceback
6
8 from hashlib import md5
7 log = logging.getLogger(__name__)
9 log = logging.getLogger(__name__)
8
10
9 class ResultWrapper(object):
11 class ResultWrapper(object):
@@ -20,6 +22,7 b' def run_task(task, *args, **kwargs):'
20 log.info('running task %s', t.task_id)
22 log.info('running task %s', t.task_id)
21 return t
23 return t
22 except Exception, e:
24 except Exception, e:
25 print e
23 if e.errno == 111:
26 if e.errno == 111:
24 log.debug('Unnable to connect. Sync execution')
27 log.debug('Unnable to connect. Sync execution')
25 else:
28 else:
@@ -27,3 +30,37 b' def run_task(task, *args, **kwargs):'
27 #pure sync version
30 #pure sync version
28 return ResultWrapper(task(*args, **kwargs))
31 return ResultWrapper(task(*args, **kwargs))
29
32
33
34 class LockTask(object):
35 """LockTask decorator"""
36
37 def __init__(self, func):
38 self.func = func
39
40 def __call__(self, func):
41 return decorator(self.__wrapper, func)
42
43 def __wrapper(self, func, *fargs, **fkwargs):
44 params = []
45 params.extend(fargs)
46 params.extend(fkwargs.values())
47 lockkey = 'task_%s' % \
48 md5(str(self.func) + '-' + '-'.join(map(str, params))).hexdigest()
49 log.info('running task with lockkey %s', lockkey)
50 try:
51 l = DaemonLock(lockkey)
52 return func(*fargs, **fkwargs)
53 l.release()
54 except LockHeld:
55 log.info('LockHeld')
56 return 'Task with key %s already running' % lockkey
57
58
59
60
61
62
63
64
65
66
@@ -2,7 +2,7 b' from celery.decorators import task'
2 from celery.task.sets import subtask
2 from celery.task.sets import subtask
3 from celeryconfig import PYLONS_CONFIG as config
3 from celeryconfig import PYLONS_CONFIG as config
4 from pylons.i18n.translation import _
4 from pylons.i18n.translation import _
5 from pylons_app.lib.celerylib import run_task
5 from pylons_app.lib.celerylib import run_task, LockTask
6 from pylons_app.lib.helpers import person
6 from pylons_app.lib.helpers import person
7 from pylons_app.lib.smtp_mailer import SmtpMailer
7 from pylons_app.lib.smtp_mailer import SmtpMailer
8 from pylons_app.lib.utils import OrderedDict
8 from pylons_app.lib.utils import OrderedDict
@@ -68,7 +68,7 b' def get_hg_ui_settings():'
68 @task
68 @task
69 def whoosh_index(repo_location, full_index):
69 def whoosh_index(repo_location, full_index):
70 log = whoosh_index.get_logger()
70 log = whoosh_index.get_logger()
71 from pylons_app.lib.indexers import DaemonLock
71 from pylons_app.lib.pidlock import DaemonLock
72 from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon, LockHeld
72 from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon, LockHeld
73 try:
73 try:
74 l = DaemonLock()
74 l = DaemonLock()
@@ -80,7 +80,9 b' def whoosh_index(repo_location, full_ind'
80 log.info('LockHeld')
80 log.info('LockHeld')
81 return 'LockHeld'
81 return 'LockHeld'
82
82
83
83 @task
84 @task
85 @LockTask('get_commits_stats')
84 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
86 def get_commits_stats(repo_name, ts_min_y, ts_max_y):
85 author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
87 author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
86
88
@@ -92,7 +94,7 b' def get_commits_stats(repo_name, ts_min_'
92 repo = MercurialRepository(repos_path + repo_name)
94 repo = MercurialRepository(repos_path + repo_name)
93
95
94 skip_date_limit = True
96 skip_date_limit = True
95 parse_limit = 500 #limit for single task changeset parsing
97 parse_limit = 350 #limit for single task changeset parsing
96 last_rev = 0
98 last_rev = 0
97 last_cs = None
99 last_cs = None
98 timegetter = itemgetter('time')
100 timegetter = itemgetter('time')
@@ -206,6 +208,8 b' def get_commits_stats(repo_name, ts_min_'
206 sa.rollback()
208 sa.rollback()
207 return False
209 return False
208
210
211 run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
212
209 return True
213 return True
210
214
211 @task
215 @task
@@ -1,5 +1,4 b''
1 from os.path import dirname as dn, join as jn
1 from os.path import dirname as dn, join as jn
2 from pidlock import LockHeld, DaemonLock
3 from pylons_app.config.environment import load_environment
2 from pylons_app.config.environment import load_environment
4 from pylons_app.model.hg_model import HgModel
3 from pylons_app.model.hg_model import HgModel
5 from shutil import rmtree
4 from shutil import rmtree
@@ -32,7 +32,7 b' from os.path import join as jn'
32 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
32 project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
33 sys.path.append(project_path)
33 sys.path.append(project_path)
34
34
35 from pidlock import LockHeld, DaemonLock
35 from pylons_app.lib.pidlock import LockHeld, DaemonLock
36 from pylons_app.model.hg_model import HgModel
36 from pylons_app.model.hg_model import HgModel
37 from pylons_app.lib.helpers import safe_unicode
37 from pylons_app.lib.helpers import safe_unicode
38 from whoosh.index import create_in, open_dir
38 from whoosh.index import create_in, open_dir
@@ -6,7 +6,7 b' class LockHeld(Exception):pass'
6
6
7
7
8 class DaemonLock(object):
8 class DaemonLock(object):
9 '''daemon locking
9 """daemon locking
10 USAGE:
10 USAGE:
11 try:
11 try:
12 l = lock()
12 l = lock()
@@ -14,7 +14,7 b' class DaemonLock(object):'
14 l.release()
14 l.release()
15 except LockHeld:
15 except LockHeld:
16 sys.exit(1)
16 sys.exit(1)
17 '''
17 """
18
18
19 def __init__(self, file=None, callbackfn=None,
19 def __init__(self, file=None, callbackfn=None,
20 desc='daemon lock', debug=False):
20 desc='daemon lock', debug=False):
@@ -40,9 +40,9 b' class DaemonLock(object):'
40
40
41
41
42 def lock(self):
42 def lock(self):
43 '''
43 """
44 locking function, if lock is present it will raise LockHeld exception
44 locking function, if lock is present it will raise LockHeld exception
45 '''
45 """
46 lockname = '%s' % (os.getpid())
46 lockname = '%s' % (os.getpid())
47
47
48 self.trylock()
48 self.trylock()
@@ -75,9 +75,9 b' class DaemonLock(object):'
75
75
76
76
77 def release(self):
77 def release(self):
78 '''
78 """
79 releases the pid by removing the pidfile
79 releases the pid by removing the pidfile
80 '''
80 """
81 if self.callbackfn:
81 if self.callbackfn:
82 #execute callback function on release
82 #execute callback function on release
83 if self.debug:
83 if self.debug:
@@ -94,11 +94,11 b' class DaemonLock(object):'
94 pass
94 pass
95
95
96 def makelock(self, lockname, pidfile):
96 def makelock(self, lockname, pidfile):
97 '''
97 """
98 this function will make an actual lock
98 this function will make an actual lock
99 @param lockname: acctual pid of file
99 @param lockname: acctual pid of file
100 @param pidfile: the file to write the pid in
100 @param pidfile: the file to write the pid in
101 '''
101 """
102 if self.debug:
102 if self.debug:
103 print 'creating a file %s and pid: %s' % (pidfile, lockname)
103 print 'creating a file %s and pid: %s' % (pidfile, lockname)
104 pidfile = open(self.pidfile, "wb")
104 pidfile = open(self.pidfile, "wb")
@@ -374,9 +374,8 b' def create_test_index(repo_location, ful'
374 @param repo_location:
374 @param repo_location:
375 @param full_index:
375 @param full_index:
376 """
376 """
377 from pylons_app.lib.indexers import daemon
378 from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon
377 from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon
379 from pylons_app.lib.indexers.pidlock import DaemonLock, LockHeld
378 from pylons_app.lib.pidlock import DaemonLock, LockHeld
380 from pylons_app.lib.indexers import IDX_LOCATION
379 from pylons_app.lib.indexers import IDX_LOCATION
381 import shutil
380 import shutil
382
381
@@ -29,26 +29,27 b''
29 <th>${_('Last commiter')}</th>
29 <th>${_('Last commiter')}</th>
30 </tr>
30 </tr>
31 </thead>
31 </thead>
32
33 % if c.files_list.parent:
32 <tr class="parity0">
34 <tr class="parity0">
33 <td>
35 <td>
34 % if c.files_list.parent:
35 ${h.link_to('..',h.url('files_home',repo_name=c.repo_name,revision=c.cur_rev,f_path=c.files_list.parent.path),class_="browser-dir")}
36 ${h.link_to('..',h.url('files_home',repo_name=c.repo_name,revision=c.cur_rev,f_path=c.files_list.parent.path),class_="browser-dir")}
36 %endif
37 </td>
37 </td>
38 <td></td>
38 <td></td>
39 <td></td>
39 <td></td>
40 <td></td>
40 <td></td>
41 <td></td>
41 <td></td>
42 <td></td>
42 </tr>
43 </tr>
44 %endif
45
43 %for cnt,node in enumerate(c.files_list,1):
46 %for cnt,node in enumerate(c.files_list,1):
44 <tr class="parity${cnt%2}">
47 <tr class="parity${cnt%2}">
45 <td>
48 <td>
46 ${h.link_to(node.name,h.url('files_home',repo_name=c.repo_name,revision=c.cur_rev,f_path=node.path),class_=file_class(node))}
49 ${h.link_to(node.name,h.url('files_home',repo_name=c.repo_name,revision=c.cur_rev,f_path=node.path),class_=file_class(node))}
47 </td>
50 </td>
48 <td>
51 <td>
49 %if node.is_file():
50 ${h.format_byte_size(node.size,binary=True)}
52 ${h.format_byte_size(node.size,binary=True)}
51 %endif
52 </td>
53 </td>
53 <td>
54 <td>
54 %if node.is_file():
55 %if node.is_file():
General Comments 0
You need to be logged in to leave comments. Login now