##// END OF EJS Templates
celery: create custom celery task class which loads context/threadlocals...
dan -
r576:ffa50597 default
parent child Browse files
Show More
@@ -1,147 +1,228 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 celery libs for RhodeCode
21 celery libs for RhodeCode
22 """
22 """
23
23
24
24
25 import pylons
25 import socket
26 import socket
26 import logging
27 import logging
27
28
28 import rhodecode
29 import rhodecode
29
30
30 from os.path import join as jn
31 from os.path import join as jn
31 from pylons import config
32 from pylons import config
33 from celery.task import Task
34 from pyramid.request import Request
35 from pyramid.scripting import prepare
36 from pyramid.threadlocal import get_current_request
32
37
33 from decorator import decorator
38 from decorator import decorator
34
39
35 from zope.cachedescriptors.property import Lazy as LazyProperty
40 from zope.cachedescriptors.property import Lazy as LazyProperty
36
41
37 from rhodecode.config import utils
42 from rhodecode.config import utils
38 from rhodecode.lib.utils2 import safe_str, md5_safe, aslist
43 from rhodecode.lib.utils2 import (
44 safe_str, md5_safe, aslist, get_routes_generator_for_server_url,
45 get_server_url)
39 from rhodecode.lib.pidlock import DaemonLock, LockHeld
46 from rhodecode.lib.pidlock import DaemonLock, LockHeld
40 from rhodecode.lib.vcs import connect_vcs
47 from rhodecode.lib.vcs import connect_vcs
41 from rhodecode.model import meta
48 from rhodecode.model import meta
49 from rhodecode.lib.auth import AuthUser
42
50
43 log = logging.getLogger(__name__)
51 log = logging.getLogger(__name__)
44
52
45
53
46 class ResultWrapper(object):
54 class ResultWrapper(object):
47 def __init__(self, task):
55 def __init__(self, task):
48 self.task = task
56 self.task = task
49
57
50 @LazyProperty
58 @LazyProperty
51 def result(self):
59 def result(self):
52 return self.task
60 return self.task
53
61
54
62
63 class RhodecodeCeleryTask(Task):
64 """
65 This is a celery task which will create a rhodecode app instance context
66 for the task, patch pyramid + pylons threadlocals with the original request
67 that created the task and also add the user to the context.
68
69 This class as a whole should be removed once the pylons port is complete
70 and a pyramid only solution for celery is implemented as per issue #4139
71 """
72
73 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
74 link=None, link_error=None, **options):
75 """ queue the job to run (we are in web request context here) """
76
77 request = get_current_request()
78
79 # we hook into kwargs since it is the only way to pass our data to the
80 # celery worker in celery 2.2
81 kwargs.update({
82 '_rhodecode_proxy_data': {
83 'environ': {
84 'PATH_INFO': request.environ['PATH_INFO'],
85 'SCRIPT_NAME': request.environ['SCRIPT_NAME'],
86 'HTTP_HOST': request.environ.get('HTTP_HOST',
87 request.environ['SERVER_NAME']),
88 'SERVER_NAME': request.environ['SERVER_NAME'],
89 'SERVER_PORT': request.environ['SERVER_PORT'],
90 'wsgi.url_scheme': request.environ['wsgi.url_scheme'],
91 },
92 'auth_user': {
93 'ip_addr': request.user.ip_addr,
94 'user_id': request.user.user_id
95 },
96 }
97 })
98 return super(RhodecodeCeleryTask, self).apply_async(
99 args, kwargs, task_id, producer, link, link_error, **options)
100
101 def __call__(self, *args, **kwargs):
102 """ rebuild the context and then run task on celery worker """
103 proxy_data = kwargs.pop('_rhodecode_proxy_data', {})
104
105 if not proxy_data:
106 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
107
108 log.debug('using celery proxy data to run task: %r', proxy_data)
109
110 from rhodecode.config.routing import make_map
111 from rhodecode.config.middleware import make_pyramid_app
112
113 # TODO: this can be done once per worker versus per task
114 pyramid_app = make_pyramid_app(config, **config['app_conf'])
115
116 request = Request.blank('/', environ=proxy_data['environ'])
117 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
118 ip_addr=proxy_data['auth_user']['ip_addr'])
119
120 pyramid_request = prepare(request) # set pyramid threadlocal request
121
122 # pylons routing
123 if not rhodecode.CONFIG.get('routes.map'):
124 rhodecode.CONFIG['routes.map'] = make_map(config)
125 pylons.url._push_object(get_routes_generator_for_server_url(
126 get_server_url(request.environ)
127 ))
128
129 try:
130 return super(RhodecodeCeleryTask, self).__call__(*args, **kwargs)
131 finally:
132 pyramid_request['closer']()
133 pylons.url._pop_object()
134
135
55 def run_task(task, *args, **kwargs):
136 def run_task(task, *args, **kwargs):
56 if rhodecode.CELERY_ENABLED:
137 if rhodecode.CELERY_ENABLED:
57 celery_is_up = False
138 celery_is_up = False
58 try:
139 try:
59 t = task.apply_async(args=args, kwargs=kwargs)
140 t = task.apply_async(args=args, kwargs=kwargs)
60 log.info('running task %s:%s', t.task_id, task)
141 log.info('running task %s:%s', t.task_id, task)
61 celery_is_up = True
142 celery_is_up = True
62 return t
143 return t
63
144
64 except socket.error as e:
145 except socket.error as e:
65 if isinstance(e, IOError) and e.errno == 111:
146 if isinstance(e, IOError) and e.errno == 111:
66 log.error('Unable to connect to celeryd. Sync execution')
147 log.error('Unable to connect to celeryd. Sync execution')
67 else:
148 else:
68 log.exception("Exception while connecting to celeryd.")
149 log.exception("Exception while connecting to celeryd.")
69 except KeyError as e:
150 except KeyError as e:
70 log.error('Unable to connect to celeryd. Sync execution')
151 log.error('Unable to connect to celeryd. Sync execution')
71 except Exception as e:
152 except Exception as e:
72 log.exception(
153 log.exception(
73 "Exception while trying to run task asynchronous. "
154 "Exception while trying to run task asynchronous. "
74 "Fallback to sync execution.")
155 "Fallback to sync execution.")
75
156
76 # keep in mind there maybe a subtle race condition where something
157 # keep in mind there maybe a subtle race condition where something
77 # depending on rhodecode.CELERY_ENABLED such as @dbsession decorator
158 # depending on rhodecode.CELERY_ENABLED such as @dbsession decorator
78 # will see CELERY_ENABLED as True before this has a chance to set False
159 # will see CELERY_ENABLED as True before this has a chance to set False
79 rhodecode.CELERY_ENABLED = celery_is_up
160 rhodecode.CELERY_ENABLED = celery_is_up
80 else:
161 else:
81 log.debug('executing task %s in sync mode', task)
162 log.debug('executing task %s in sync mode', task)
82 return ResultWrapper(task(*args, **kwargs))
163 return ResultWrapper(task(*args, **kwargs))
83
164
84
165
85 def __get_lockkey(func, *fargs, **fkwargs):
166 def __get_lockkey(func, *fargs, **fkwargs):
86 params = list(fargs)
167 params = list(fargs)
87 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
168 params.extend(['%s-%s' % ar for ar in fkwargs.items()])
88
169
89 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
170 func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
90 _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
171 _lock_key = func_name + '-' + '-'.join(map(safe_str, params))
91 return 'task_%s.lock' % (md5_safe(_lock_key),)
172 return 'task_%s.lock' % (md5_safe(_lock_key),)
92
173
93
174
94 def locked_task(func):
175 def locked_task(func):
95 def __wrapper(func, *fargs, **fkwargs):
176 def __wrapper(func, *fargs, **fkwargs):
96 lockkey = __get_lockkey(func, *fargs, **fkwargs)
177 lockkey = __get_lockkey(func, *fargs, **fkwargs)
97 lockkey_path = config['app_conf']['cache_dir']
178 lockkey_path = config['app_conf']['cache_dir']
98
179
99 log.info('running task with lockkey %s' % lockkey)
180 log.info('running task with lockkey %s' % lockkey)
100 try:
181 try:
101 l = DaemonLock(file_=jn(lockkey_path, lockkey))
182 l = DaemonLock(file_=jn(lockkey_path, lockkey))
102 ret = func(*fargs, **fkwargs)
183 ret = func(*fargs, **fkwargs)
103 l.release()
184 l.release()
104 return ret
185 return ret
105 except LockHeld:
186 except LockHeld:
106 log.info('LockHeld')
187 log.info('LockHeld')
107 return 'Task with key %s already running' % lockkey
188 return 'Task with key %s already running' % lockkey
108
189
109 return decorator(__wrapper, func)
190 return decorator(__wrapper, func)
110
191
111
192
112 def get_session():
193 def get_session():
113 if rhodecode.CELERY_ENABLED:
194 if rhodecode.CELERY_ENABLED:
114 utils.initialize_database(config)
195 utils.initialize_database(config)
115 sa = meta.Session()
196 sa = meta.Session()
116 return sa
197 return sa
117
198
118
199
119 def dbsession(func):
200 def dbsession(func):
120 def __wrapper(func, *fargs, **fkwargs):
201 def __wrapper(func, *fargs, **fkwargs):
121 try:
202 try:
122 ret = func(*fargs, **fkwargs)
203 ret = func(*fargs, **fkwargs)
123 return ret
204 return ret
124 finally:
205 finally:
125 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
206 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
126 meta.Session.remove()
207 meta.Session.remove()
127
208
128 return decorator(__wrapper, func)
209 return decorator(__wrapper, func)
129
210
130
211
131 def vcsconnection(func):
212 def vcsconnection(func):
132 def __wrapper(func, *fargs, **fkwargs):
213 def __wrapper(func, *fargs, **fkwargs):
133 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
214 if rhodecode.CELERY_ENABLED and not rhodecode.CELERY_EAGER:
134 backends = config['vcs.backends'] = aslist(
215 backends = config['vcs.backends'] = aslist(
135 config.get('vcs.backends', 'hg,git'), sep=',')
216 config.get('vcs.backends', 'hg,git'), sep=',')
136 for alias in rhodecode.BACKENDS.keys():
217 for alias in rhodecode.BACKENDS.keys():
137 if alias not in backends:
218 if alias not in backends:
138 del rhodecode.BACKENDS[alias]
219 del rhodecode.BACKENDS[alias]
139 utils.configure_pyro4(config)
220 utils.configure_pyro4(config)
140 utils.configure_vcs(config)
221 utils.configure_vcs(config)
141 connect_vcs(
222 connect_vcs(
142 config['vcs.server'],
223 config['vcs.server'],
143 utils.get_vcs_server_protocol(config))
224 utils.get_vcs_server_protocol(config))
144 ret = func(*fargs, **fkwargs)
225 ret = func(*fargs, **fkwargs)
145 return ret
226 return ret
146
227
147 return decorator(__wrapper, func)
228 return decorator(__wrapper, func)
@@ -1,284 +1,284 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2012-2016 RhodeCode GmbH
3 # Copyright (C) 2012-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 RhodeCode task modules, containing all task that suppose to be run
22 RhodeCode task modules, containing all task that suppose to be run
23 by celery daemon
23 by celery daemon
24 """
24 """
25
25
26
26
27 import os
27 import os
28 import logging
28 import logging
29
29
30 from celery.task import task
30 from celery.task import task
31 from pylons import config
31 from pylons import config
32
32
33 import rhodecode
33 import rhodecode
34 from rhodecode.lib.celerylib import (
34 from rhodecode.lib.celerylib import (
35 run_task, dbsession, __get_lockkey, LockHeld, DaemonLock,
35 run_task, dbsession, __get_lockkey, LockHeld, DaemonLock,
36 get_session, vcsconnection)
36 get_session, vcsconnection, RhodecodeCeleryTask)
37 from rhodecode.lib.hooks_base import log_create_repository
37 from rhodecode.lib.hooks_base import log_create_repository
38 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
38 from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
39 from rhodecode.lib.utils import add_cache, action_logger
39 from rhodecode.lib.utils import add_cache, action_logger
40 from rhodecode.lib.utils2 import safe_int, str2bool
40 from rhodecode.lib.utils2 import safe_int, str2bool
41 from rhodecode.model.db import Repository, User
41 from rhodecode.model.db import Repository, User
42
42
43
43
44 add_cache(config) # pragma: no cover
44 add_cache(config) # pragma: no cover
45
45
46
46
47 def get_logger(cls):
47 def get_logger(cls):
48 if rhodecode.CELERY_ENABLED:
48 if rhodecode.CELERY_ENABLED:
49 try:
49 try:
50 log = cls.get_logger()
50 log = cls.get_logger()
51 except Exception:
51 except Exception:
52 log = logging.getLogger(__name__)
52 log = logging.getLogger(__name__)
53 else:
53 else:
54 log = logging.getLogger(__name__)
54 log = logging.getLogger(__name__)
55
55
56 return log
56 return log
57
57
58
58
59 @task(ignore_result=True)
59 @task(ignore_result=True, base=RhodecodeCeleryTask)
60 @dbsession
60 @dbsession
61 def send_email(recipients, subject, body='', html_body='', email_config=None):
61 def send_email(recipients, subject, body='', html_body='', email_config=None):
62 """
62 """
63 Sends an email with defined parameters from the .ini files.
63 Sends an email with defined parameters from the .ini files.
64
64
65 :param recipients: list of recipients, it this is empty the defined email
65 :param recipients: list of recipients, it this is empty the defined email
66 address from field 'email_to' is used instead
66 address from field 'email_to' is used instead
67 :param subject: subject of the mail
67 :param subject: subject of the mail
68 :param body: body of the mail
68 :param body: body of the mail
69 :param html_body: html version of body
69 :param html_body: html version of body
70 """
70 """
71 log = get_logger(send_email)
71 log = get_logger(send_email)
72
72
73 email_config = email_config or config
73 email_config = email_config or config
74 subject = "%s %s" % (email_config.get('email_prefix', ''), subject)
74 subject = "%s %s" % (email_config.get('email_prefix', ''), subject)
75 if not recipients:
75 if not recipients:
76 # if recipients are not defined we send to email_config + all admins
76 # if recipients are not defined we send to email_config + all admins
77 admins = [
77 admins = [
78 u.email for u in User.query().filter(User.admin == True).all()]
78 u.email for u in User.query().filter(User.admin == True).all()]
79 recipients = [email_config.get('email_to')] + admins
79 recipients = [email_config.get('email_to')] + admins
80
80
81 mail_server = email_config.get('smtp_server') or None
81 mail_server = email_config.get('smtp_server') or None
82 if mail_server is None:
82 if mail_server is None:
83 log.error("SMTP server information missing. Sending email failed. "
83 log.error("SMTP server information missing. Sending email failed. "
84 "Make sure that `smtp_server` variable is configured "
84 "Make sure that `smtp_server` variable is configured "
85 "inside the .ini file")
85 "inside the .ini file")
86 return False
86 return False
87
87
88 mail_from = email_config.get('app_email_from', 'RhodeCode')
88 mail_from = email_config.get('app_email_from', 'RhodeCode')
89 user = email_config.get('smtp_username')
89 user = email_config.get('smtp_username')
90 passwd = email_config.get('smtp_password')
90 passwd = email_config.get('smtp_password')
91 mail_port = email_config.get('smtp_port')
91 mail_port = email_config.get('smtp_port')
92 tls = str2bool(email_config.get('smtp_use_tls'))
92 tls = str2bool(email_config.get('smtp_use_tls'))
93 ssl = str2bool(email_config.get('smtp_use_ssl'))
93 ssl = str2bool(email_config.get('smtp_use_ssl'))
94 debug = str2bool(email_config.get('debug'))
94 debug = str2bool(email_config.get('debug'))
95 smtp_auth = email_config.get('smtp_auth')
95 smtp_auth = email_config.get('smtp_auth')
96
96
97 try:
97 try:
98 m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth,
98 m = SmtpMailer(mail_from, user, passwd, mail_server, smtp_auth,
99 mail_port, ssl, tls, debug=debug)
99 mail_port, ssl, tls, debug=debug)
100 m.send(recipients, subject, body, html_body)
100 m.send(recipients, subject, body, html_body)
101 except Exception:
101 except Exception:
102 log.exception('Mail sending failed')
102 log.exception('Mail sending failed')
103 return False
103 return False
104 return True
104 return True
105
105
106
106
107 @task(ignore_result=False)
107 @task(ignore_result=True, base=RhodecodeCeleryTask)
108 @dbsession
108 @dbsession
109 @vcsconnection
109 @vcsconnection
110 def create_repo(form_data, cur_user):
110 def create_repo(form_data, cur_user):
111 from rhodecode.model.repo import RepoModel
111 from rhodecode.model.repo import RepoModel
112 from rhodecode.model.user import UserModel
112 from rhodecode.model.user import UserModel
113 from rhodecode.model.settings import SettingsModel
113 from rhodecode.model.settings import SettingsModel
114
114
115 log = get_logger(create_repo)
115 log = get_logger(create_repo)
116 DBS = get_session()
116 DBS = get_session()
117
117
118 cur_user = UserModel(DBS)._get_user(cur_user)
118 cur_user = UserModel(DBS)._get_user(cur_user)
119 owner = cur_user
119 owner = cur_user
120
120
121 repo_name = form_data['repo_name']
121 repo_name = form_data['repo_name']
122 repo_name_full = form_data['repo_name_full']
122 repo_name_full = form_data['repo_name_full']
123 repo_type = form_data['repo_type']
123 repo_type = form_data['repo_type']
124 description = form_data['repo_description']
124 description = form_data['repo_description']
125 private = form_data['repo_private']
125 private = form_data['repo_private']
126 clone_uri = form_data.get('clone_uri')
126 clone_uri = form_data.get('clone_uri')
127 repo_group = safe_int(form_data['repo_group'])
127 repo_group = safe_int(form_data['repo_group'])
128 landing_rev = form_data['repo_landing_rev']
128 landing_rev = form_data['repo_landing_rev']
129 copy_fork_permissions = form_data.get('copy_permissions')
129 copy_fork_permissions = form_data.get('copy_permissions')
130 copy_group_permissions = form_data.get('repo_copy_permissions')
130 copy_group_permissions = form_data.get('repo_copy_permissions')
131 fork_of = form_data.get('fork_parent_id')
131 fork_of = form_data.get('fork_parent_id')
132 state = form_data.get('repo_state', Repository.STATE_PENDING)
132 state = form_data.get('repo_state', Repository.STATE_PENDING)
133
133
134 # repo creation defaults, private and repo_type are filled in form
134 # repo creation defaults, private and repo_type are filled in form
135 defs = SettingsModel().get_default_repo_settings(strip_prefix=True)
135 defs = SettingsModel().get_default_repo_settings(strip_prefix=True)
136 enable_statistics = form_data.get(
136 enable_statistics = form_data.get(
137 'enable_statistics', defs.get('repo_enable_statistics'))
137 'enable_statistics', defs.get('repo_enable_statistics'))
138 enable_locking = form_data.get(
138 enable_locking = form_data.get(
139 'enable_locking', defs.get('repo_enable_locking'))
139 'enable_locking', defs.get('repo_enable_locking'))
140 enable_downloads = form_data.get(
140 enable_downloads = form_data.get(
141 'enable_downloads', defs.get('repo_enable_downloads'))
141 'enable_downloads', defs.get('repo_enable_downloads'))
142
142
143 try:
143 try:
144 RepoModel(DBS)._create_repo(
144 RepoModel(DBS)._create_repo(
145 repo_name=repo_name_full,
145 repo_name=repo_name_full,
146 repo_type=repo_type,
146 repo_type=repo_type,
147 description=description,
147 description=description,
148 owner=owner,
148 owner=owner,
149 private=private,
149 private=private,
150 clone_uri=clone_uri,
150 clone_uri=clone_uri,
151 repo_group=repo_group,
151 repo_group=repo_group,
152 landing_rev=landing_rev,
152 landing_rev=landing_rev,
153 fork_of=fork_of,
153 fork_of=fork_of,
154 copy_fork_permissions=copy_fork_permissions,
154 copy_fork_permissions=copy_fork_permissions,
155 copy_group_permissions=copy_group_permissions,
155 copy_group_permissions=copy_group_permissions,
156 enable_statistics=enable_statistics,
156 enable_statistics=enable_statistics,
157 enable_locking=enable_locking,
157 enable_locking=enable_locking,
158 enable_downloads=enable_downloads,
158 enable_downloads=enable_downloads,
159 state=state
159 state=state
160 )
160 )
161
161
162 action_logger(cur_user, 'user_created_repo',
162 action_logger(cur_user, 'user_created_repo',
163 repo_name_full, '', DBS)
163 repo_name_full, '', DBS)
164 DBS.commit()
164 DBS.commit()
165
165
166 # now create this repo on Filesystem
166 # now create this repo on Filesystem
167 RepoModel(DBS)._create_filesystem_repo(
167 RepoModel(DBS)._create_filesystem_repo(
168 repo_name=repo_name,
168 repo_name=repo_name,
169 repo_type=repo_type,
169 repo_type=repo_type,
170 repo_group=RepoModel(DBS)._get_repo_group(repo_group),
170 repo_group=RepoModel(DBS)._get_repo_group(repo_group),
171 clone_uri=clone_uri,
171 clone_uri=clone_uri,
172 )
172 )
173 repo = Repository.get_by_repo_name(repo_name_full)
173 repo = Repository.get_by_repo_name(repo_name_full)
174 log_create_repository(created_by=owner.username, **repo.get_dict())
174 log_create_repository(created_by=owner.username, **repo.get_dict())
175
175
176 # update repo commit caches initially
176 # update repo commit caches initially
177 repo.update_commit_cache()
177 repo.update_commit_cache()
178
178
179 # set new created state
179 # set new created state
180 repo.set_state(Repository.STATE_CREATED)
180 repo.set_state(Repository.STATE_CREATED)
181 DBS.commit()
181 DBS.commit()
182 except Exception as e:
182 except Exception as e:
183 log.warning('Exception %s occurred when creating repository, '
183 log.warning('Exception %s occurred when creating repository, '
184 'doing cleanup...', e)
184 'doing cleanup...', e)
185 # rollback things manually !
185 # rollback things manually !
186 repo = Repository.get_by_repo_name(repo_name_full)
186 repo = Repository.get_by_repo_name(repo_name_full)
187 if repo:
187 if repo:
188 Repository.delete(repo.repo_id)
188 Repository.delete(repo.repo_id)
189 DBS.commit()
189 DBS.commit()
190 RepoModel(DBS)._delete_filesystem_repo(repo)
190 RepoModel(DBS)._delete_filesystem_repo(repo)
191 raise
191 raise
192
192
193 # it's an odd fix to make celery fail task when exception occurs
193 # it's an odd fix to make celery fail task when exception occurs
194 def on_failure(self, *args, **kwargs):
194 def on_failure(self, *args, **kwargs):
195 pass
195 pass
196
196
197 return True
197 return True
198
198
199
199
200 @task(ignore_result=False)
200 @task(ignore_result=True, base=RhodecodeCeleryTask)
201 @dbsession
201 @dbsession
202 @vcsconnection
202 @vcsconnection
203 def create_repo_fork(form_data, cur_user):
203 def create_repo_fork(form_data, cur_user):
204 """
204 """
205 Creates a fork of repository using internal VCS methods
205 Creates a fork of repository using internal VCS methods
206
206
207 :param form_data:
207 :param form_data:
208 :param cur_user:
208 :param cur_user:
209 """
209 """
210 from rhodecode.model.repo import RepoModel
210 from rhodecode.model.repo import RepoModel
211 from rhodecode.model.user import UserModel
211 from rhodecode.model.user import UserModel
212
212
213 log = get_logger(create_repo_fork)
213 log = get_logger(create_repo_fork)
214 DBS = get_session()
214 DBS = get_session()
215
215
216 cur_user = UserModel(DBS)._get_user(cur_user)
216 cur_user = UserModel(DBS)._get_user(cur_user)
217 owner = cur_user
217 owner = cur_user
218
218
219 repo_name = form_data['repo_name'] # fork in this case
219 repo_name = form_data['repo_name'] # fork in this case
220 repo_name_full = form_data['repo_name_full']
220 repo_name_full = form_data['repo_name_full']
221 repo_type = form_data['repo_type']
221 repo_type = form_data['repo_type']
222 description = form_data['description']
222 description = form_data['description']
223 private = form_data['private']
223 private = form_data['private']
224 clone_uri = form_data.get('clone_uri')
224 clone_uri = form_data.get('clone_uri')
225 repo_group = safe_int(form_data['repo_group'])
225 repo_group = safe_int(form_data['repo_group'])
226 landing_rev = form_data['landing_rev']
226 landing_rev = form_data['landing_rev']
227 copy_fork_permissions = form_data.get('copy_permissions')
227 copy_fork_permissions = form_data.get('copy_permissions')
228 fork_id = safe_int(form_data.get('fork_parent_id'))
228 fork_id = safe_int(form_data.get('fork_parent_id'))
229
229
230 try:
230 try:
231 fork_of = RepoModel(DBS)._get_repo(fork_id)
231 fork_of = RepoModel(DBS)._get_repo(fork_id)
232 RepoModel(DBS)._create_repo(
232 RepoModel(DBS)._create_repo(
233 repo_name=repo_name_full,
233 repo_name=repo_name_full,
234 repo_type=repo_type,
234 repo_type=repo_type,
235 description=description,
235 description=description,
236 owner=owner,
236 owner=owner,
237 private=private,
237 private=private,
238 clone_uri=clone_uri,
238 clone_uri=clone_uri,
239 repo_group=repo_group,
239 repo_group=repo_group,
240 landing_rev=landing_rev,
240 landing_rev=landing_rev,
241 fork_of=fork_of,
241 fork_of=fork_of,
242 copy_fork_permissions=copy_fork_permissions
242 copy_fork_permissions=copy_fork_permissions
243 )
243 )
244 action_logger(cur_user, 'user_forked_repo:%s' % repo_name_full,
244 action_logger(cur_user, 'user_forked_repo:%s' % repo_name_full,
245 fork_of.repo_name, '', DBS)
245 fork_of.repo_name, '', DBS)
246 DBS.commit()
246 DBS.commit()
247
247
248 base_path = Repository.base_path()
248 base_path = Repository.base_path()
249 source_repo_path = os.path.join(base_path, fork_of.repo_name)
249 source_repo_path = os.path.join(base_path, fork_of.repo_name)
250
250
251 # now create this repo on Filesystem
251 # now create this repo on Filesystem
252 RepoModel(DBS)._create_filesystem_repo(
252 RepoModel(DBS)._create_filesystem_repo(
253 repo_name=repo_name,
253 repo_name=repo_name,
254 repo_type=repo_type,
254 repo_type=repo_type,
255 repo_group=RepoModel(DBS)._get_repo_group(repo_group),
255 repo_group=RepoModel(DBS)._get_repo_group(repo_group),
256 clone_uri=source_repo_path,
256 clone_uri=source_repo_path,
257 )
257 )
258 repo = Repository.get_by_repo_name(repo_name_full)
258 repo = Repository.get_by_repo_name(repo_name_full)
259 log_create_repository(created_by=owner.username, **repo.get_dict())
259 log_create_repository(created_by=owner.username, **repo.get_dict())
260
260
261 # update repo commit caches initially
261 # update repo commit caches initially
262 config = repo._config
262 config = repo._config
263 config.set('extensions', 'largefiles', '')
263 config.set('extensions', 'largefiles', '')
264 repo.update_commit_cache(config=config)
264 repo.update_commit_cache(config=config)
265
265
266 # set new created state
266 # set new created state
267 repo.set_state(Repository.STATE_CREATED)
267 repo.set_state(Repository.STATE_CREATED)
268 DBS.commit()
268 DBS.commit()
269 except Exception as e:
269 except Exception as e:
270 log.warning('Exception %s occurred when forking repository, '
270 log.warning('Exception %s occurred when forking repository, '
271 'doing cleanup...', e)
271 'doing cleanup...', e)
272 # rollback things manually !
272 # rollback things manually !
273 repo = Repository.get_by_repo_name(repo_name_full)
273 repo = Repository.get_by_repo_name(repo_name_full)
274 if repo:
274 if repo:
275 Repository.delete(repo.repo_id)
275 Repository.delete(repo.repo_id)
276 DBS.commit()
276 DBS.commit()
277 RepoModel(DBS)._delete_filesystem_repo(repo)
277 RepoModel(DBS)._delete_filesystem_repo(repo)
278 raise
278 raise
279
279
280 # it's an odd fix to make celery fail task when exception occurs
280 # it's an odd fix to make celery fail task when exception occurs
281 def on_failure(self, *args, **kwargs):
281 def on_failure(self, *args, **kwargs):
282 pass
282 pass
283
283
284 return True
284 return True
General Comments 0
You need to be logged in to leave comments. Login now