##// END OF EJS Templates
events: ensure stable execution of integrations
marcink -
r3806:5bb0de26 stable
parent child Browse files
Show More
@@ -1,221 +1,241 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2011-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21
22 22 """
23 23 Model for integrations
24 24 """
25 25
26 26
27 27 import logging
28 28
29 29 from sqlalchemy import or_, and_
30 30
31 31 import rhodecode
32 32 from rhodecode import events
33 33 from rhodecode.integrations.types.base import EEIntegration
34 34 from rhodecode.lib.caching_query import FromCache
35 35 from rhodecode.model import BaseModel
36 from rhodecode.model.db import Integration, Repository, RepoGroup, true, false
36 from rhodecode.model.db import Integration, Repository, RepoGroup, true, false, case
37 37 from rhodecode.integrations import integration_type_registry
38 38
39 39 log = logging.getLogger(__name__)
40 40
41 41
42 42 class IntegrationModel(BaseModel):
43 43
44 44 cls = Integration
45 45
46 46 def __get_integration(self, integration):
47 47 if isinstance(integration, Integration):
48 48 return integration
49 49 elif isinstance(integration, (int, long)):
50 50 return self.sa.query(Integration).get(integration)
51 51 else:
52 52 if integration:
53 53 raise Exception('integration must be int, long or Instance'
54 54 ' of Integration got %s' % type(integration))
55 55
56 56 def create(self, IntegrationType, name, enabled, repo, repo_group,
57 57 child_repos_only, settings):
58 58 """ Create an IntegrationType integration """
59 59 integration = Integration()
60 60 integration.integration_type = IntegrationType.key
61 61 self.sa.add(integration)
62 62 self.update_integration(integration, name, enabled, repo, repo_group,
63 63 child_repos_only, settings)
64 64 self.sa.commit()
65 65 return integration
66 66
67 67 def update_integration(self, integration, name, enabled, repo, repo_group,
68 68 child_repos_only, settings):
69 69 integration = self.__get_integration(integration)
70 70
71 71 integration.repo = repo
72 72 integration.repo_group = repo_group
73 73 integration.child_repos_only = child_repos_only
74 74 integration.name = name
75 75 integration.enabled = enabled
76 76 integration.settings = settings
77 77
78 78 return integration
79 79
80 80 def delete(self, integration):
81 81 integration = self.__get_integration(integration)
82 82 if integration:
83 83 self.sa.delete(integration)
84 84 return True
85 85 return False
86 86
87 87 def get_integration_handler(self, integration):
88 88 TypeClass = integration_type_registry.get(integration.integration_type)
89 89 if not TypeClass:
90 90 log.error('No class could be found for integration type: {}'.format(
91 91 integration.integration_type))
92 92 return None
93 93 elif isinstance(TypeClass, EEIntegration) or issubclass(TypeClass, EEIntegration):
94 94 log.error('EE integration cannot be '
95 95 'executed for integration type: {}'.format(
96 96 integration.integration_type))
97 97 return None
98 98
99 99 return TypeClass(integration.settings)
100 100
101 101 def send_event(self, integration, event):
102 102 """ Send an event to an integration """
103 103 handler = self.get_integration_handler(integration)
104 104 if handler:
105 105 log.debug(
106 106 'events: sending event %s on integration %s using handler %s',
107 107 event, integration, handler)
108 108 handler.send_event(event)
109 109
110 110 def get_integrations(self, scope, IntegrationType=None):
111 111 """
112 112 Return integrations for a scope, which must be one of:
113 113
114 114 'all' - every integration, global/repogroup/repo
115 115 'global' - global integrations only
116 116 <Repository> instance - integrations for this repo only
117 117 <RepoGroup> instance - integrations for this repogroup only
118 118 """
119 119
120 120 if isinstance(scope, Repository):
121 121 query = self.sa.query(Integration).filter(
122 122 Integration.repo == scope)
123 123 elif isinstance(scope, RepoGroup):
124 124 query = self.sa.query(Integration).filter(
125 125 Integration.repo_group == scope)
126 126 elif scope == 'global':
127 127 # global integrations
128 128 query = self.sa.query(Integration).filter(
129 129 and_(Integration.repo_id == None, Integration.repo_group_id == None)
130 130 )
131 131 elif scope == 'root-repos':
132 132 query = self.sa.query(Integration).filter(
133 133 and_(Integration.repo_id == None,
134 134 Integration.repo_group_id == None,
135 135 Integration.child_repos_only == true())
136 136 )
137 137 elif scope == 'all':
138 138 query = self.sa.query(Integration)
139 139 else:
140 140 raise Exception(
141 141 "invalid `scope`, must be one of: "
142 142 "['global', 'all', <Repository>, <RepoGroup>]")
143 143
144 144 if IntegrationType is not None:
145 145 query = query.filter(
146 146 Integration.integration_type==IntegrationType.key)
147 147
148 148 result = []
149 149 for integration in query.all():
150 150 IntType = integration_type_registry.get(integration.integration_type)
151 151 result.append((IntType, integration))
152 152 return result
153 153
154 154 def get_for_event(self, event, cache=False):
155 155 """
156 156 Get integrations that match an event
157 157 """
158 # base query
158 159 query = self.sa.query(
159 160 Integration
160 161 ).filter(
161 162 Integration.enabled == true()
162 163 )
163 164
164 165 global_integrations_filter = and_(
165 166 Integration.repo_id == None,
166 167 Integration.repo_group_id == None,
167 Integration.child_repos_only == False,
168 Integration.child_repos_only == false(),
168 169 )
169 170
170 171 if isinstance(event, events.RepoEvent):
171 172 root_repos_integrations_filter = and_(
172 173 Integration.repo_id == None,
173 174 Integration.repo_group_id == None,
174 175 Integration.child_repos_only == true(),
175 176 )
176 177
177 178 clauses = [
178 179 global_integrations_filter,
179 180 ]
181 cases = [
182 (global_integrations_filter, 1),
183 (root_repos_integrations_filter, 2),
184 ]
180 185
181 # repo integrations
182 if event.repo.repo_id: # pre create events dont have a repo_id yet
183 clauses.append(
184 Integration.repo_id == event.repo.repo_id
186 # repo group integrations
187 if event.repo.group:
188 # repo group with only root level repos
189 group_child_repos_filter = and_(
190 Integration.repo_group_id == event.repo.group.group_id,
191 Integration.child_repos_only == true()
185 192 )
186 193
187 if event.repo.group:
188 clauses.append(
189 and_(
190 Integration.repo_group_id == event.repo.group.group_id,
191 Integration.child_repos_only == true()
192 )
194 clauses.append(group_child_repos_filter)
195 cases.append(
196 (group_child_repos_filter, 3),
193 197 )
198
194 199 # repo group cascade to kids
195 clauses.append(
196 and_(
197 Integration.repo_group_id.in_(
198 [group.group_id for group in
199 event.repo.groups_with_parents]
200 ),
201 Integration.child_repos_only == false()
202 )
200 group_recursive_repos_filter = and_(
201 Integration.repo_group_id.in_(
202 [group.group_id for group in event.repo.groups_with_parents]
203 ),
204 Integration.child_repos_only == false()
205 )
206 clauses.append(group_recursive_repos_filter)
207 cases.append(
208 (group_recursive_repos_filter, 4),
203 209 )
204 210
205 211 if not event.repo.group: # root repo
206 212 clauses.append(root_repos_integrations_filter)
207 213
214 # repo integrations
215 if event.repo.repo_id: # pre create events dont have a repo_id yet
216 specific_repo_filter = Integration.repo_id == event.repo.repo_id
217 clauses.append(specific_repo_filter)
218 cases.append(
219 (specific_repo_filter, 5),
220 )
221
222 order_by_criterion = case(cases)
223
208 224 query = query.filter(or_(*clauses))
225 query = query.order_by(order_by_criterion)
209 226
210 227 if cache:
211 228 cache_key = "get_enabled_repo_integrations_%i" % event.repo.repo_id
212 229 query = query.options(
213 230 FromCache("sql_cache_short", cache_key))
214 231 else: # only global integrations
232 order_by_criterion = Integration.integration_id
233
215 234 query = query.filter(global_integrations_filter)
235 query = query.order_by(order_by_criterion)
216 236 if cache:
217 237 query = query.options(
218 238 FromCache("sql_cache_short", "get_enabled_global_integrations"))
219 239
220 240 result = query.all()
221 241 return result
@@ -1,216 +1,216 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import time
22 22 import pytest
23 23
24 24 from rhodecode import events
25 25 from rhodecode.tests.fixture import Fixture
26 26 from rhodecode.model.db import Session, Integration
27 27 from rhodecode.model.integration import IntegrationModel
28 28
29 29
30 30 class TestDeleteScopesDeletesIntegrations(object):
31 31 def test_delete_repo_with_integration_deletes_integration(
32 32 self, repo_integration_stub):
33 33
34 34 Session().delete(repo_integration_stub.repo)
35 35 Session().commit()
36 36 Session().expire_all()
37 37 integration = Integration.get(repo_integration_stub.integration_id)
38 38 assert integration is None
39 39
40 40 def test_delete_repo_group_with_integration_deletes_integration(
41 41 self, repogroup_integration_stub):
42 42
43 43 Session().delete(repogroup_integration_stub.repo_group)
44 44 Session().commit()
45 45 Session().expire_all()
46 46 integration = Integration.get(repogroup_integration_stub.integration_id)
47 47 assert integration is None
48 48
49 49
50 50 @pytest.fixture
51 51 def integration_repos(request, StubIntegrationType, stub_integration_settings):
52 52 """
53 53 Create repositories and integrations for testing, and destroy them after
54 54
55 55 Structure:
56 56 root_repo
57 57 parent_group/
58 58 parent_repo
59 59 child_group/
60 60 child_repo
61 61 other_group/
62 62 other_repo
63 63 """
64 64 fixture = Fixture()
65 65
66 66
67 67 parent_group_id = 'int_test_parent_group_%s' % time.time()
68 68 parent_group = fixture.create_repo_group(parent_group_id)
69 69
70 70 other_group_id = 'int_test_other_group_%s' % time.time()
71 71 other_group = fixture.create_repo_group(other_group_id)
72 72
73 73 child_group_id = (
74 74 parent_group_id + '/' + 'int_test_child_group_%s' % time.time())
75 75 child_group = fixture.create_repo_group(child_group_id)
76 76
77 77 parent_repo_id = 'int_test_parent_repo_%s' % time.time()
78 78 parent_repo = fixture.create_repo(parent_repo_id, repo_group=parent_group)
79 79
80 80 child_repo_id = 'int_test_child_repo_%s' % time.time()
81 81 child_repo = fixture.create_repo(child_repo_id, repo_group=child_group)
82 82
83 83 other_repo_id = 'int_test_other_repo_%s' % time.time()
84 84 other_repo = fixture.create_repo(other_repo_id, repo_group=other_group)
85 85
86 86 root_repo_id = 'int_test_repo_root_%s' % time.time()
87 87 root_repo = fixture.create_repo(root_repo_id)
88 88
89 89 integrations = {}
90 90 for name, repo, repo_group, child_repos_only in [
91 91 ('global', None, None, None),
92 92 ('root_repos', None, None, True),
93 93 ('parent_repo', parent_repo, None, None),
94 94 ('child_repo', child_repo, None, None),
95 95 ('other_repo', other_repo, None, None),
96 96 ('root_repo', root_repo, None, None),
97 97 ('parent_group', None, parent_group, True),
98 98 ('parent_group_recursive', None, parent_group, False),
99 99 ('child_group', None, child_group, True),
100 100 ('child_group_recursive', None, child_group, False),
101 101 ('other_group', None, other_group, True),
102 102 ('other_group_recursive', None, other_group, False),
103 103 ]:
104 104 integrations[name] = IntegrationModel().create(
105 105 StubIntegrationType, settings=stub_integration_settings,
106 106 enabled=True, name='test %s integration' % name,
107 107 repo=repo, repo_group=repo_group, child_repos_only=child_repos_only)
108 108
109 109 Session().commit()
110 110
111 111 def _cleanup():
112 112 for integration in integrations.values():
113 113 Session.delete(integration)
114 114
115 115 fixture.destroy_repo(root_repo)
116 116 fixture.destroy_repo(child_repo)
117 117 fixture.destroy_repo(parent_repo)
118 118 fixture.destroy_repo(other_repo)
119 119 fixture.destroy_repo_group(child_group)
120 120 fixture.destroy_repo_group(parent_group)
121 121 fixture.destroy_repo_group(other_group)
122 122
123 123 request.addfinalizer(_cleanup)
124 124
125 125 return {
126 126 'integrations': integrations,
127 127 'repos': {
128 128 'root_repo': root_repo,
129 129 'other_repo': other_repo,
130 130 'parent_repo': parent_repo,
131 'child_repo': child_repo,
131 'child_repo': child_repo,
132 132 }
133 133 }
134 134
135 135
136 136 def test_enabled_integration_repo_scopes(integration_repos):
137 137 integrations = integration_repos['integrations']
138 138 repos = integration_repos['repos']
139 139
140 140 triggered_integrations = IntegrationModel().get_for_event(
141 141 events.RepoEvent(repos['root_repo']))
142 142
143 143 assert triggered_integrations == [
144 144 integrations['global'],
145 145 integrations['root_repos'],
146 146 integrations['root_repo'],
147 147 ]
148 148
149 149 triggered_integrations = IntegrationModel().get_for_event(
150 150 events.RepoEvent(repos['other_repo']))
151 151
152 152 assert triggered_integrations == [
153 153 integrations['global'],
154 integrations['other_repo'],
155 154 integrations['other_group'],
156 155 integrations['other_group_recursive'],
156 integrations['other_repo'],
157 157 ]
158 158
159 159 triggered_integrations = IntegrationModel().get_for_event(
160 160 events.RepoEvent(repos['parent_repo']))
161 161
162 162 assert triggered_integrations == [
163 163 integrations['global'],
164 integrations['parent_repo'],
165 164 integrations['parent_group'],
166 165 integrations['parent_group_recursive'],
166 integrations['parent_repo'],
167 167 ]
168 168
169 169 triggered_integrations = IntegrationModel().get_for_event(
170 170 events.RepoEvent(repos['child_repo']))
171 171
172 172 assert triggered_integrations == [
173 173 integrations['global'],
174 integrations['child_repo'],
174 integrations['child_group'],
175 175 integrations['parent_group_recursive'],
176 integrations['child_group'],
177 176 integrations['child_group_recursive'],
177 integrations['child_repo'],
178 178 ]
179 179
180 180
181 181 def test_disabled_integration_repo_scopes(integration_repos):
182 182 integrations = integration_repos['integrations']
183 183 repos = integration_repos['repos']
184 184
185 185 for integration in integrations.values():
186 186 integration.enabled = False
187 187 Session().commit()
188 188
189 189 triggered_integrations = IntegrationModel().get_for_event(
190 190 events.RepoEvent(repos['root_repo']))
191 191
192 192 assert triggered_integrations == []
193 193
194 194 triggered_integrations = IntegrationModel().get_for_event(
195 195 events.RepoEvent(repos['parent_repo']))
196 196
197 197 assert triggered_integrations == []
198 198
199 199 triggered_integrations = IntegrationModel().get_for_event(
200 200 events.RepoEvent(repos['child_repo']))
201 201
202 202 assert triggered_integrations == []
203 203
204 204 triggered_integrations = IntegrationModel().get_for_event(
205 205 events.RepoEvent(repos['other_repo']))
206 206
207 207 assert triggered_integrations == []
208 208
209 209
210 210 def test_enabled_non_repo_integrations(integration_repos):
211 211 integrations = integration_repos['integrations']
212 212
213 213 triggered_integrations = IntegrationModel().get_for_event(
214 214 events.UserPreCreate({}))
215 215
216 216 assert triggered_integrations == [integrations['global']]
General Comments 0
You need to be logged in to leave comments. Login now