##// END OF EJS Templates
infinitepush: don't force ipv6 while connecting to mysql server...
Pulkit Goyal -
r37251:c1fac387 default
parent child Browse files
Show More
@@ -1,257 +1,256
1 1 # Infinite push
2 2 #
3 3 # Copyright 2016 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import logging
11 11 import os
12 12 import time
13 13
14 14 import warnings
15 15 import mysql.connector
16 16
17 17 from . import indexapi
18 18
19 19 def _convertbookmarkpattern(pattern):
20 20 pattern = pattern.replace('_', '\\_')
21 21 pattern = pattern.replace('%', '\\%')
22 22 if pattern.endswith('*'):
23 23 pattern = pattern[:-1] + '%'
24 24 return pattern
25 25
26 26 class sqlindexapi(indexapi.indexapi):
27 27 '''
28 28 Sql backend for infinitepush index. See schema.sql
29 29 '''
30 30
31 31 def __init__(self, reponame, host, port,
32 32 database, user, password, logfile, loglevel,
33 33 waittimeout=300, locktimeout=120):
34 34 super(sqlindexapi, self).__init__()
35 35 self.reponame = reponame
36 36 self.sqlargs = {
37 37 'host': host,
38 38 'port': port,
39 39 'database': database,
40 40 'user': user,
41 41 'password': password,
42 42 }
43 43 self.sqlconn = None
44 44 self.sqlcursor = None
45 45 if not logfile:
46 46 logfile = os.devnull
47 47 logging.basicConfig(filename=logfile)
48 48 self.log = logging.getLogger()
49 49 self.log.setLevel(loglevel)
50 50 self._connected = False
51 51 self._waittimeout = waittimeout
52 52 self._locktimeout = locktimeout
53 53
54 54 def sqlconnect(self):
55 55 if self.sqlconn:
56 56 raise indexapi.indexexception("SQL connection already open")
57 57 if self.sqlcursor:
58 58 raise indexapi.indexexception("SQL cursor already open without"
59 59 " connection")
60 60 retry = 3
61 61 while True:
62 62 try:
63 self.sqlconn = mysql.connector.connect(
64 force_ipv6=True, **self.sqlargs)
63 self.sqlconn = mysql.connector.connect(**self.sqlargs)
65 64
66 65 # Code is copy-pasted from hgsql. Bug fixes need to be
67 66 # back-ported!
68 67 # The default behavior is to return byte arrays, when we
69 68 # need strings. This custom convert returns strings.
70 69 self.sqlconn.set_converter_class(CustomConverter)
71 70 self.sqlconn.autocommit = False
72 71 break
73 72 except mysql.connector.errors.Error:
74 73 # mysql can be flakey occasionally, so do some minimal
75 74 # retrying.
76 75 retry -= 1
77 76 if retry == 0:
78 77 raise
79 78 time.sleep(0.2)
80 79
81 80 waittimeout = self.sqlconn.converter.escape('%s' % self._waittimeout)
82 81
83 82 self.sqlcursor = self.sqlconn.cursor()
84 83 self.sqlcursor.execute("SET wait_timeout=%s" % waittimeout)
85 84 self.sqlcursor.execute("SET innodb_lock_wait_timeout=%s" %
86 85 self._locktimeout)
87 86 self._connected = True
88 87
89 88 def close(self):
90 89 """Cleans up the metadata store connection."""
91 90 with warnings.catch_warnings():
92 91 warnings.simplefilter("ignore")
93 92 self.sqlcursor.close()
94 93 self.sqlconn.close()
95 94 self.sqlcursor = None
96 95 self.sqlconn = None
97 96
98 97 def __enter__(self):
99 98 if not self._connected:
100 99 self.sqlconnect()
101 100 return self
102 101
103 102 def __exit__(self, exc_type, exc_val, exc_tb):
104 103 if exc_type is None:
105 104 self.sqlconn.commit()
106 105 else:
107 106 self.sqlconn.rollback()
108 107
109 108 def addbundle(self, bundleid, nodesctx):
110 109 if not self._connected:
111 110 self.sqlconnect()
112 111 self.log.info("ADD BUNDLE %r %r" % (self.reponame, bundleid))
113 112 self.sqlcursor.execute(
114 113 "INSERT INTO bundles(bundle, reponame) VALUES "
115 114 "(%s, %s)", params=(bundleid, self.reponame))
116 115 for ctx in nodesctx:
117 116 self.sqlcursor.execute(
118 117 "INSERT INTO nodestobundle(node, bundle, reponame) "
119 118 "VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE "
120 119 "bundle=VALUES(bundle)",
121 120 params=(ctx.hex(), bundleid, self.reponame))
122 121
123 122 extra = ctx.extra()
124 123 author_name = ctx.user()
125 124 committer_name = extra.get('committer', ctx.user())
126 125 author_date = int(ctx.date()[0])
127 126 committer_date = int(extra.get('committer_date', author_date))
128 127 self.sqlcursor.execute(
129 128 "INSERT IGNORE INTO nodesmetadata(node, message, p1, p2, "
130 129 "author, committer, author_date, committer_date, "
131 130 "reponame) VALUES "
132 131 "(%s, %s, %s, %s, %s, %s, %s, %s, %s)",
133 132 params=(ctx.hex(), ctx.description(),
134 133 ctx.p1().hex(), ctx.p2().hex(), author_name,
135 134 committer_name, author_date, committer_date,
136 135 self.reponame)
137 136 )
138 137
139 138 def addbookmark(self, bookmark, node):
140 139 """Takes a bookmark name and hash, and records mapping in the metadata
141 140 store."""
142 141 if not self._connected:
143 142 self.sqlconnect()
144 143 self.log.info(
145 144 "ADD BOOKMARKS %r bookmark: %r node: %r" %
146 145 (self.reponame, bookmark, node))
147 146 self.sqlcursor.execute(
148 147 "INSERT INTO bookmarkstonode(bookmark, node, reponame) "
149 148 "VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE node=VALUES(node)",
150 149 params=(bookmark, node, self.reponame))
151 150
152 151 def addmanybookmarks(self, bookmarks):
153 152 if not self._connected:
154 153 self.sqlconnect()
155 154 args = []
156 155 values = []
157 156 for bookmark, node in bookmarks.iteritems():
158 157 args.append('(%s, %s, %s)')
159 158 values.extend((bookmark, node, self.reponame))
160 159 args = ','.join(args)
161 160
162 161 self.sqlcursor.execute(
163 162 "INSERT INTO bookmarkstonode(bookmark, node, reponame) "
164 163 "VALUES %s ON DUPLICATE KEY UPDATE node=VALUES(node)" % args,
165 164 params=values)
166 165
167 166 def deletebookmarks(self, patterns):
168 167 """Accepts list of bookmark patterns and deletes them.
169 168 If `commit` is set then bookmark will actually be deleted. Otherwise
170 169 deletion will be delayed until the end of transaction.
171 170 """
172 171 if not self._connected:
173 172 self.sqlconnect()
174 173 self.log.info("DELETE BOOKMARKS: %s" % patterns)
175 174 for pattern in patterns:
176 175 pattern = _convertbookmarkpattern(pattern)
177 176 self.sqlcursor.execute(
178 177 "DELETE from bookmarkstonode WHERE bookmark LIKE (%s) "
179 178 "and reponame = %s",
180 179 params=(pattern, self.reponame))
181 180
182 181 def getbundle(self, node):
183 182 """Returns the bundleid for the bundle that contains the given node."""
184 183 if not self._connected:
185 184 self.sqlconnect()
186 185 self.log.info("GET BUNDLE %r %r" % (self.reponame, node))
187 186 self.sqlcursor.execute(
188 187 "SELECT bundle from nodestobundle "
189 188 "WHERE node = %s AND reponame = %s", params=(node, self.reponame))
190 189 result = self.sqlcursor.fetchall()
191 190 if len(result) != 1 or len(result[0]) != 1:
192 191 self.log.info("No matching node")
193 192 return None
194 193 bundle = result[0][0]
195 194 self.log.info("Found bundle %r" % bundle)
196 195 return bundle
197 196
198 197 def getnode(self, bookmark):
199 198 """Returns the node for the given bookmark. None if it doesn't exist."""
200 199 if not self._connected:
201 200 self.sqlconnect()
202 201 self.log.info(
203 202 "GET NODE reponame: %r bookmark: %r" % (self.reponame, bookmark))
204 203 self.sqlcursor.execute(
205 204 "SELECT node from bookmarkstonode WHERE "
206 205 "bookmark = %s AND reponame = %s", params=(bookmark, self.reponame))
207 206 result = self.sqlcursor.fetchall()
208 207 if len(result) != 1 or len(result[0]) != 1:
209 208 self.log.info("No matching bookmark")
210 209 return None
211 210 node = result[0][0]
212 211 self.log.info("Found node %r" % node)
213 212 return node
214 213
215 214 def getbookmarks(self, query):
216 215 if not self._connected:
217 216 self.sqlconnect()
218 217 self.log.info(
219 218 "QUERY BOOKMARKS reponame: %r query: %r" % (self.reponame, query))
220 219 query = _convertbookmarkpattern(query)
221 220 self.sqlcursor.execute(
222 221 "SELECT bookmark, node from bookmarkstonode WHERE "
223 222 "reponame = %s AND bookmark LIKE %s",
224 223 params=(self.reponame, query))
225 224 result = self.sqlcursor.fetchall()
226 225 bookmarks = {}
227 226 for row in result:
228 227 if len(row) != 2:
229 228 self.log.info("Bad row returned: %s" % row)
230 229 continue
231 230 bookmarks[row[0]] = row[1]
232 231 return bookmarks
233 232
234 233 def saveoptionaljsonmetadata(self, node, jsonmetadata):
235 234 if not self._connected:
236 235 self.sqlconnect()
237 236 self.log.info(
238 237 ("INSERT METADATA, QUERY BOOKMARKS reponame: %r " +
239 238 "node: %r, jsonmetadata: %s") %
240 239 (self.reponame, node, jsonmetadata))
241 240
242 241 self.sqlcursor.execute(
243 242 "UPDATE nodesmetadata SET optional_json_metadata=%s WHERE "
244 243 "reponame=%s AND node=%s",
245 244 params=(jsonmetadata, self.reponame, node))
246 245
247 246 class CustomConverter(mysql.connector.conversion.MySQLConverter):
248 247 """Ensure that all values being returned are returned as python string
249 248 (versus the default byte arrays)."""
250 249 def _STRING_to_python(self, value, dsc=None):
251 250 return str(value)
252 251
253 252 def _VAR_STRING_to_python(self, value, dsc=None):
254 253 return str(value)
255 254
256 255 def _BLOB_to_python(self, value, dsc=None):
257 256 return str(value)
General Comments 0
You need to be logged in to leave comments. Login now