##// END OF EJS Templates
infinitepush: don't force ipv6 while connecting to mysql server...
Pulkit Goyal -
r37251:c1fac387 default
parent child Browse files
Show More
@@ -1,257 +1,256
1 # Infinite push
1 # Infinite push
2 #
2 #
3 # Copyright 2016 Facebook, Inc.
3 # Copyright 2016 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import logging
10 import logging
11 import os
11 import os
12 import time
12 import time
13
13
14 import warnings
14 import warnings
15 import mysql.connector
15 import mysql.connector
16
16
17 from . import indexapi
17 from . import indexapi
18
18
19 def _convertbookmarkpattern(pattern):
19 def _convertbookmarkpattern(pattern):
20 pattern = pattern.replace('_', '\\_')
20 pattern = pattern.replace('_', '\\_')
21 pattern = pattern.replace('%', '\\%')
21 pattern = pattern.replace('%', '\\%')
22 if pattern.endswith('*'):
22 if pattern.endswith('*'):
23 pattern = pattern[:-1] + '%'
23 pattern = pattern[:-1] + '%'
24 return pattern
24 return pattern
25
25
26 class sqlindexapi(indexapi.indexapi):
26 class sqlindexapi(indexapi.indexapi):
27 '''
27 '''
28 Sql backend for infinitepush index. See schema.sql
28 Sql backend for infinitepush index. See schema.sql
29 '''
29 '''
30
30
31 def __init__(self, reponame, host, port,
31 def __init__(self, reponame, host, port,
32 database, user, password, logfile, loglevel,
32 database, user, password, logfile, loglevel,
33 waittimeout=300, locktimeout=120):
33 waittimeout=300, locktimeout=120):
34 super(sqlindexapi, self).__init__()
34 super(sqlindexapi, self).__init__()
35 self.reponame = reponame
35 self.reponame = reponame
36 self.sqlargs = {
36 self.sqlargs = {
37 'host': host,
37 'host': host,
38 'port': port,
38 'port': port,
39 'database': database,
39 'database': database,
40 'user': user,
40 'user': user,
41 'password': password,
41 'password': password,
42 }
42 }
43 self.sqlconn = None
43 self.sqlconn = None
44 self.sqlcursor = None
44 self.sqlcursor = None
45 if not logfile:
45 if not logfile:
46 logfile = os.devnull
46 logfile = os.devnull
47 logging.basicConfig(filename=logfile)
47 logging.basicConfig(filename=logfile)
48 self.log = logging.getLogger()
48 self.log = logging.getLogger()
49 self.log.setLevel(loglevel)
49 self.log.setLevel(loglevel)
50 self._connected = False
50 self._connected = False
51 self._waittimeout = waittimeout
51 self._waittimeout = waittimeout
52 self._locktimeout = locktimeout
52 self._locktimeout = locktimeout
53
53
54 def sqlconnect(self):
54 def sqlconnect(self):
55 if self.sqlconn:
55 if self.sqlconn:
56 raise indexapi.indexexception("SQL connection already open")
56 raise indexapi.indexexception("SQL connection already open")
57 if self.sqlcursor:
57 if self.sqlcursor:
58 raise indexapi.indexexception("SQL cursor already open without"
58 raise indexapi.indexexception("SQL cursor already open without"
59 " connection")
59 " connection")
60 retry = 3
60 retry = 3
61 while True:
61 while True:
62 try:
62 try:
63 self.sqlconn = mysql.connector.connect(
63 self.sqlconn = mysql.connector.connect(**self.sqlargs)
64 force_ipv6=True, **self.sqlargs)
65
64
66 # Code is copy-pasted from hgsql. Bug fixes need to be
65 # Code is copy-pasted from hgsql. Bug fixes need to be
67 # back-ported!
66 # back-ported!
68 # The default behavior is to return byte arrays, when we
67 # The default behavior is to return byte arrays, when we
69 # need strings. This custom convert returns strings.
68 # need strings. This custom convert returns strings.
70 self.sqlconn.set_converter_class(CustomConverter)
69 self.sqlconn.set_converter_class(CustomConverter)
71 self.sqlconn.autocommit = False
70 self.sqlconn.autocommit = False
72 break
71 break
73 except mysql.connector.errors.Error:
72 except mysql.connector.errors.Error:
74 # mysql can be flakey occasionally, so do some minimal
73 # mysql can be flakey occasionally, so do some minimal
75 # retrying.
74 # retrying.
76 retry -= 1
75 retry -= 1
77 if retry == 0:
76 if retry == 0:
78 raise
77 raise
79 time.sleep(0.2)
78 time.sleep(0.2)
80
79
81 waittimeout = self.sqlconn.converter.escape('%s' % self._waittimeout)
80 waittimeout = self.sqlconn.converter.escape('%s' % self._waittimeout)
82
81
83 self.sqlcursor = self.sqlconn.cursor()
82 self.sqlcursor = self.sqlconn.cursor()
84 self.sqlcursor.execute("SET wait_timeout=%s" % waittimeout)
83 self.sqlcursor.execute("SET wait_timeout=%s" % waittimeout)
85 self.sqlcursor.execute("SET innodb_lock_wait_timeout=%s" %
84 self.sqlcursor.execute("SET innodb_lock_wait_timeout=%s" %
86 self._locktimeout)
85 self._locktimeout)
87 self._connected = True
86 self._connected = True
88
87
89 def close(self):
88 def close(self):
90 """Cleans up the metadata store connection."""
89 """Cleans up the metadata store connection."""
91 with warnings.catch_warnings():
90 with warnings.catch_warnings():
92 warnings.simplefilter("ignore")
91 warnings.simplefilter("ignore")
93 self.sqlcursor.close()
92 self.sqlcursor.close()
94 self.sqlconn.close()
93 self.sqlconn.close()
95 self.sqlcursor = None
94 self.sqlcursor = None
96 self.sqlconn = None
95 self.sqlconn = None
97
96
98 def __enter__(self):
97 def __enter__(self):
99 if not self._connected:
98 if not self._connected:
100 self.sqlconnect()
99 self.sqlconnect()
101 return self
100 return self
102
101
103 def __exit__(self, exc_type, exc_val, exc_tb):
102 def __exit__(self, exc_type, exc_val, exc_tb):
104 if exc_type is None:
103 if exc_type is None:
105 self.sqlconn.commit()
104 self.sqlconn.commit()
106 else:
105 else:
107 self.sqlconn.rollback()
106 self.sqlconn.rollback()
108
107
109 def addbundle(self, bundleid, nodesctx):
108 def addbundle(self, bundleid, nodesctx):
110 if not self._connected:
109 if not self._connected:
111 self.sqlconnect()
110 self.sqlconnect()
112 self.log.info("ADD BUNDLE %r %r" % (self.reponame, bundleid))
111 self.log.info("ADD BUNDLE %r %r" % (self.reponame, bundleid))
113 self.sqlcursor.execute(
112 self.sqlcursor.execute(
114 "INSERT INTO bundles(bundle, reponame) VALUES "
113 "INSERT INTO bundles(bundle, reponame) VALUES "
115 "(%s, %s)", params=(bundleid, self.reponame))
114 "(%s, %s)", params=(bundleid, self.reponame))
116 for ctx in nodesctx:
115 for ctx in nodesctx:
117 self.sqlcursor.execute(
116 self.sqlcursor.execute(
118 "INSERT INTO nodestobundle(node, bundle, reponame) "
117 "INSERT INTO nodestobundle(node, bundle, reponame) "
119 "VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE "
118 "VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE "
120 "bundle=VALUES(bundle)",
119 "bundle=VALUES(bundle)",
121 params=(ctx.hex(), bundleid, self.reponame))
120 params=(ctx.hex(), bundleid, self.reponame))
122
121
123 extra = ctx.extra()
122 extra = ctx.extra()
124 author_name = ctx.user()
123 author_name = ctx.user()
125 committer_name = extra.get('committer', ctx.user())
124 committer_name = extra.get('committer', ctx.user())
126 author_date = int(ctx.date()[0])
125 author_date = int(ctx.date()[0])
127 committer_date = int(extra.get('committer_date', author_date))
126 committer_date = int(extra.get('committer_date', author_date))
128 self.sqlcursor.execute(
127 self.sqlcursor.execute(
129 "INSERT IGNORE INTO nodesmetadata(node, message, p1, p2, "
128 "INSERT IGNORE INTO nodesmetadata(node, message, p1, p2, "
130 "author, committer, author_date, committer_date, "
129 "author, committer, author_date, committer_date, "
131 "reponame) VALUES "
130 "reponame) VALUES "
132 "(%s, %s, %s, %s, %s, %s, %s, %s, %s)",
131 "(%s, %s, %s, %s, %s, %s, %s, %s, %s)",
133 params=(ctx.hex(), ctx.description(),
132 params=(ctx.hex(), ctx.description(),
134 ctx.p1().hex(), ctx.p2().hex(), author_name,
133 ctx.p1().hex(), ctx.p2().hex(), author_name,
135 committer_name, author_date, committer_date,
134 committer_name, author_date, committer_date,
136 self.reponame)
135 self.reponame)
137 )
136 )
138
137
139 def addbookmark(self, bookmark, node):
138 def addbookmark(self, bookmark, node):
140 """Takes a bookmark name and hash, and records mapping in the metadata
139 """Takes a bookmark name and hash, and records mapping in the metadata
141 store."""
140 store."""
142 if not self._connected:
141 if not self._connected:
143 self.sqlconnect()
142 self.sqlconnect()
144 self.log.info(
143 self.log.info(
145 "ADD BOOKMARKS %r bookmark: %r node: %r" %
144 "ADD BOOKMARKS %r bookmark: %r node: %r" %
146 (self.reponame, bookmark, node))
145 (self.reponame, bookmark, node))
147 self.sqlcursor.execute(
146 self.sqlcursor.execute(
148 "INSERT INTO bookmarkstonode(bookmark, node, reponame) "
147 "INSERT INTO bookmarkstonode(bookmark, node, reponame) "
149 "VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE node=VALUES(node)",
148 "VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE node=VALUES(node)",
150 params=(bookmark, node, self.reponame))
149 params=(bookmark, node, self.reponame))
151
150
152 def addmanybookmarks(self, bookmarks):
151 def addmanybookmarks(self, bookmarks):
153 if not self._connected:
152 if not self._connected:
154 self.sqlconnect()
153 self.sqlconnect()
155 args = []
154 args = []
156 values = []
155 values = []
157 for bookmark, node in bookmarks.iteritems():
156 for bookmark, node in bookmarks.iteritems():
158 args.append('(%s, %s, %s)')
157 args.append('(%s, %s, %s)')
159 values.extend((bookmark, node, self.reponame))
158 values.extend((bookmark, node, self.reponame))
160 args = ','.join(args)
159 args = ','.join(args)
161
160
162 self.sqlcursor.execute(
161 self.sqlcursor.execute(
163 "INSERT INTO bookmarkstonode(bookmark, node, reponame) "
162 "INSERT INTO bookmarkstonode(bookmark, node, reponame) "
164 "VALUES %s ON DUPLICATE KEY UPDATE node=VALUES(node)" % args,
163 "VALUES %s ON DUPLICATE KEY UPDATE node=VALUES(node)" % args,
165 params=values)
164 params=values)
166
165
167 def deletebookmarks(self, patterns):
166 def deletebookmarks(self, patterns):
168 """Accepts list of bookmark patterns and deletes them.
167 """Accepts list of bookmark patterns and deletes them.
169 If `commit` is set then bookmark will actually be deleted. Otherwise
168 If `commit` is set then bookmark will actually be deleted. Otherwise
170 deletion will be delayed until the end of transaction.
169 deletion will be delayed until the end of transaction.
171 """
170 """
172 if not self._connected:
171 if not self._connected:
173 self.sqlconnect()
172 self.sqlconnect()
174 self.log.info("DELETE BOOKMARKS: %s" % patterns)
173 self.log.info("DELETE BOOKMARKS: %s" % patterns)
175 for pattern in patterns:
174 for pattern in patterns:
176 pattern = _convertbookmarkpattern(pattern)
175 pattern = _convertbookmarkpattern(pattern)
177 self.sqlcursor.execute(
176 self.sqlcursor.execute(
178 "DELETE from bookmarkstonode WHERE bookmark LIKE (%s) "
177 "DELETE from bookmarkstonode WHERE bookmark LIKE (%s) "
179 "and reponame = %s",
178 "and reponame = %s",
180 params=(pattern, self.reponame))
179 params=(pattern, self.reponame))
181
180
182 def getbundle(self, node):
181 def getbundle(self, node):
183 """Returns the bundleid for the bundle that contains the given node."""
182 """Returns the bundleid for the bundle that contains the given node."""
184 if not self._connected:
183 if not self._connected:
185 self.sqlconnect()
184 self.sqlconnect()
186 self.log.info("GET BUNDLE %r %r" % (self.reponame, node))
185 self.log.info("GET BUNDLE %r %r" % (self.reponame, node))
187 self.sqlcursor.execute(
186 self.sqlcursor.execute(
188 "SELECT bundle from nodestobundle "
187 "SELECT bundle from nodestobundle "
189 "WHERE node = %s AND reponame = %s", params=(node, self.reponame))
188 "WHERE node = %s AND reponame = %s", params=(node, self.reponame))
190 result = self.sqlcursor.fetchall()
189 result = self.sqlcursor.fetchall()
191 if len(result) != 1 or len(result[0]) != 1:
190 if len(result) != 1 or len(result[0]) != 1:
192 self.log.info("No matching node")
191 self.log.info("No matching node")
193 return None
192 return None
194 bundle = result[0][0]
193 bundle = result[0][0]
195 self.log.info("Found bundle %r" % bundle)
194 self.log.info("Found bundle %r" % bundle)
196 return bundle
195 return bundle
197
196
198 def getnode(self, bookmark):
197 def getnode(self, bookmark):
199 """Returns the node for the given bookmark. None if it doesn't exist."""
198 """Returns the node for the given bookmark. None if it doesn't exist."""
200 if not self._connected:
199 if not self._connected:
201 self.sqlconnect()
200 self.sqlconnect()
202 self.log.info(
201 self.log.info(
203 "GET NODE reponame: %r bookmark: %r" % (self.reponame, bookmark))
202 "GET NODE reponame: %r bookmark: %r" % (self.reponame, bookmark))
204 self.sqlcursor.execute(
203 self.sqlcursor.execute(
205 "SELECT node from bookmarkstonode WHERE "
204 "SELECT node from bookmarkstonode WHERE "
206 "bookmark = %s AND reponame = %s", params=(bookmark, self.reponame))
205 "bookmark = %s AND reponame = %s", params=(bookmark, self.reponame))
207 result = self.sqlcursor.fetchall()
206 result = self.sqlcursor.fetchall()
208 if len(result) != 1 or len(result[0]) != 1:
207 if len(result) != 1 or len(result[0]) != 1:
209 self.log.info("No matching bookmark")
208 self.log.info("No matching bookmark")
210 return None
209 return None
211 node = result[0][0]
210 node = result[0][0]
212 self.log.info("Found node %r" % node)
211 self.log.info("Found node %r" % node)
213 return node
212 return node
214
213
215 def getbookmarks(self, query):
214 def getbookmarks(self, query):
216 if not self._connected:
215 if not self._connected:
217 self.sqlconnect()
216 self.sqlconnect()
218 self.log.info(
217 self.log.info(
219 "QUERY BOOKMARKS reponame: %r query: %r" % (self.reponame, query))
218 "QUERY BOOKMARKS reponame: %r query: %r" % (self.reponame, query))
220 query = _convertbookmarkpattern(query)
219 query = _convertbookmarkpattern(query)
221 self.sqlcursor.execute(
220 self.sqlcursor.execute(
222 "SELECT bookmark, node from bookmarkstonode WHERE "
221 "SELECT bookmark, node from bookmarkstonode WHERE "
223 "reponame = %s AND bookmark LIKE %s",
222 "reponame = %s AND bookmark LIKE %s",
224 params=(self.reponame, query))
223 params=(self.reponame, query))
225 result = self.sqlcursor.fetchall()
224 result = self.sqlcursor.fetchall()
226 bookmarks = {}
225 bookmarks = {}
227 for row in result:
226 for row in result:
228 if len(row) != 2:
227 if len(row) != 2:
229 self.log.info("Bad row returned: %s" % row)
228 self.log.info("Bad row returned: %s" % row)
230 continue
229 continue
231 bookmarks[row[0]] = row[1]
230 bookmarks[row[0]] = row[1]
232 return bookmarks
231 return bookmarks
233
232
234 def saveoptionaljsonmetadata(self, node, jsonmetadata):
233 def saveoptionaljsonmetadata(self, node, jsonmetadata):
235 if not self._connected:
234 if not self._connected:
236 self.sqlconnect()
235 self.sqlconnect()
237 self.log.info(
236 self.log.info(
238 ("INSERT METADATA, QUERY BOOKMARKS reponame: %r " +
237 ("INSERT METADATA, QUERY BOOKMARKS reponame: %r " +
239 "node: %r, jsonmetadata: %s") %
238 "node: %r, jsonmetadata: %s") %
240 (self.reponame, node, jsonmetadata))
239 (self.reponame, node, jsonmetadata))
241
240
242 self.sqlcursor.execute(
241 self.sqlcursor.execute(
243 "UPDATE nodesmetadata SET optional_json_metadata=%s WHERE "
242 "UPDATE nodesmetadata SET optional_json_metadata=%s WHERE "
244 "reponame=%s AND node=%s",
243 "reponame=%s AND node=%s",
245 params=(jsonmetadata, self.reponame, node))
244 params=(jsonmetadata, self.reponame, node))
246
245
247 class CustomConverter(mysql.connector.conversion.MySQLConverter):
246 class CustomConverter(mysql.connector.conversion.MySQLConverter):
248 """Ensure that all values being returned are returned as python string
247 """Ensure that all values being returned are returned as python string
249 (versus the default byte arrays)."""
248 (versus the default byte arrays)."""
250 def _STRING_to_python(self, value, dsc=None):
249 def _STRING_to_python(self, value, dsc=None):
251 return str(value)
250 return str(value)
252
251
253 def _VAR_STRING_to_python(self, value, dsc=None):
252 def _VAR_STRING_to_python(self, value, dsc=None):
254 return str(value)
253 return str(value)
255
254
256 def _BLOB_to_python(self, value, dsc=None):
255 def _BLOB_to_python(self, value, dsc=None):
257 return str(value)
256 return str(value)
General Comments 0
You need to be logged in to leave comments. Login now