sqlindexapi.py
296 lines
| 9.7 KiB
| text/x-python
|
PythonLexer
Pulkit Goyal
|
r37204 | # Infinite push | ||
# | ||||
# Copyright 2016 Facebook, Inc. | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
from __future__ import absolute_import | ||||
import logging | ||||
import os | ||||
import time | ||||
import warnings | ||||
import mysql.connector | ||||
from . import indexapi | ||||
Augie Fackler
|
r43346 | |||
Pulkit Goyal
|
r37204 | def _convertbookmarkpattern(pattern): | ||
Augie Fackler
|
r43347 | pattern = pattern.replace(b'_', b'\\_') | ||
pattern = pattern.replace(b'%', b'\\%') | ||||
if pattern.endswith(b'*'): | ||||
pattern = pattern[:-1] + b'%' | ||||
Pulkit Goyal
|
r37204 | return pattern | ||
Augie Fackler
|
r43346 | |||
Pulkit Goyal
|
r37204 | class sqlindexapi(indexapi.indexapi): | ||
''' | ||||
Sql backend for infinitepush index. See schema.sql | ||||
''' | ||||
Augie Fackler
|
r43346 | def __init__( | ||
self, | ||||
reponame, | ||||
host, | ||||
port, | ||||
database, | ||||
user, | ||||
password, | ||||
logfile, | ||||
loglevel, | ||||
waittimeout=300, | ||||
locktimeout=120, | ||||
): | ||||
Pulkit Goyal
|
r37204 | super(sqlindexapi, self).__init__() | ||
self.reponame = reponame | ||||
self.sqlargs = { | ||||
Augie Fackler
|
r43347 | b'host': host, | ||
b'port': port, | ||||
b'database': database, | ||||
b'user': user, | ||||
b'password': password, | ||||
Pulkit Goyal
|
r37204 | } | ||
self.sqlconn = None | ||||
self.sqlcursor = None | ||||
if not logfile: | ||||
logfile = os.devnull | ||||
logging.basicConfig(filename=logfile) | ||||
self.log = logging.getLogger() | ||||
self.log.setLevel(loglevel) | ||||
self._connected = False | ||||
self._waittimeout = waittimeout | ||||
self._locktimeout = locktimeout | ||||
def sqlconnect(self): | ||||
if self.sqlconn: | ||||
Augie Fackler
|
r43347 | raise indexapi.indexexception(b"SQL connection already open") | ||
Pulkit Goyal
|
r37204 | if self.sqlcursor: | ||
Augie Fackler
|
r43346 | raise indexapi.indexexception( | ||
Augie Fackler
|
r43347 | b"SQL cursor already open without" b" connection" | ||
Augie Fackler
|
r43346 | ) | ||
Pulkit Goyal
|
r37204 | retry = 3 | ||
while True: | ||||
try: | ||||
Pulkit Goyal
|
r37251 | self.sqlconn = mysql.connector.connect(**self.sqlargs) | ||
Pulkit Goyal
|
r37204 | |||
# Code is copy-pasted from hgsql. Bug fixes need to be | ||||
# back-ported! | ||||
# The default behavior is to return byte arrays, when we | ||||
# need strings. This custom convert returns strings. | ||||
self.sqlconn.set_converter_class(CustomConverter) | ||||
self.sqlconn.autocommit = False | ||||
break | ||||
except mysql.connector.errors.Error: | ||||
# mysql can be flakey occasionally, so do some minimal | ||||
# retrying. | ||||
retry -= 1 | ||||
if retry == 0: | ||||
raise | ||||
time.sleep(0.2) | ||||
Augie Fackler
|
r43347 | waittimeout = self.sqlconn.converter.escape(b'%s' % self._waittimeout) | ||
Pulkit Goyal
|
r37204 | |||
self.sqlcursor = self.sqlconn.cursor() | ||||
Augie Fackler
|
r43347 | self.sqlcursor.execute(b"SET wait_timeout=%s" % waittimeout) | ||
Augie Fackler
|
r43346 | self.sqlcursor.execute( | ||
Augie Fackler
|
r43347 | b"SET innodb_lock_wait_timeout=%s" % self._locktimeout | ||
Augie Fackler
|
r43346 | ) | ||
Pulkit Goyal
|
r37204 | self._connected = True | ||
def close(self): | ||||
"""Cleans up the metadata store connection.""" | ||||
with warnings.catch_warnings(): | ||||
Augie Fackler
|
r43347 | warnings.simplefilter(b"ignore") | ||
Pulkit Goyal
|
r37204 | self.sqlcursor.close() | ||
self.sqlconn.close() | ||||
self.sqlcursor = None | ||||
self.sqlconn = None | ||||
def __enter__(self): | ||||
if not self._connected: | ||||
self.sqlconnect() | ||||
return self | ||||
def __exit__(self, exc_type, exc_val, exc_tb): | ||||
if exc_type is None: | ||||
self.sqlconn.commit() | ||||
else: | ||||
self.sqlconn.rollback() | ||||
def addbundle(self, bundleid, nodesctx): | ||||
if not self._connected: | ||||
self.sqlconnect() | ||||
Augie Fackler
|
r43347 | self.log.info(b"ADD BUNDLE %r %r" % (self.reponame, bundleid)) | ||
Pulkit Goyal
|
r37204 | self.sqlcursor.execute( | ||
Augie Fackler
|
r43347 | b"INSERT INTO bundles(bundle, reponame) VALUES " b"(%s, %s)", | ||
Augie Fackler
|
r43346 | params=(bundleid, self.reponame), | ||
) | ||||
Pulkit Goyal
|
r37204 | for ctx in nodesctx: | ||
self.sqlcursor.execute( | ||||
Augie Fackler
|
r43347 | b"INSERT INTO nodestobundle(node, bundle, reponame) " | ||
b"VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE " | ||||
b"bundle=VALUES(bundle)", | ||||
Augie Fackler
|
r43346 | params=(ctx.hex(), bundleid, self.reponame), | ||
) | ||||
Pulkit Goyal
|
r37204 | |||
extra = ctx.extra() | ||||
author_name = ctx.user() | ||||
Augie Fackler
|
r43347 | committer_name = extra.get(b'committer', ctx.user()) | ||
Pulkit Goyal
|
r37204 | author_date = int(ctx.date()[0]) | ||
Augie Fackler
|
r43347 | committer_date = int(extra.get(b'committer_date', author_date)) | ||
Pulkit Goyal
|
r37204 | self.sqlcursor.execute( | ||
Augie Fackler
|
r43347 | b"INSERT IGNORE INTO nodesmetadata(node, message, p1, p2, " | ||
b"author, committer, author_date, committer_date, " | ||||
b"reponame) VALUES " | ||||
b"(%s, %s, %s, %s, %s, %s, %s, %s, %s)", | ||||
Augie Fackler
|
r43346 | params=( | ||
ctx.hex(), | ||||
ctx.description(), | ||||
ctx.p1().hex(), | ||||
ctx.p2().hex(), | ||||
author_name, | ||||
committer_name, | ||||
author_date, | ||||
committer_date, | ||||
self.reponame, | ||||
), | ||||
Pulkit Goyal
|
r37204 | ) | ||
def addbookmark(self, bookmark, node): | ||||
"""Takes a bookmark name and hash, and records mapping in the metadata | ||||
store.""" | ||||
if not self._connected: | ||||
self.sqlconnect() | ||||
self.log.info( | ||||
Augie Fackler
|
r43347 | b"ADD BOOKMARKS %r bookmark: %r node: %r" | ||
Augie Fackler
|
r43346 | % (self.reponame, bookmark, node) | ||
) | ||||
Pulkit Goyal
|
r37204 | self.sqlcursor.execute( | ||
Augie Fackler
|
r43347 | b"INSERT INTO bookmarkstonode(bookmark, node, reponame) " | ||
b"VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE node=VALUES(node)", | ||||
Augie Fackler
|
r43346 | params=(bookmark, node, self.reponame), | ||
) | ||||
Pulkit Goyal
|
r37204 | |||
def addmanybookmarks(self, bookmarks): | ||||
if not self._connected: | ||||
self.sqlconnect() | ||||
args = [] | ||||
values = [] | ||||
for bookmark, node in bookmarks.iteritems(): | ||||
Augie Fackler
|
r43347 | args.append(b'(%s, %s, %s)') | ||
Pulkit Goyal
|
r37204 | values.extend((bookmark, node, self.reponame)) | ||
Augie Fackler
|
r43347 | args = b','.join(args) | ||
Pulkit Goyal
|
r37204 | |||
self.sqlcursor.execute( | ||||
Augie Fackler
|
r43347 | b"INSERT INTO bookmarkstonode(bookmark, node, reponame) " | ||
b"VALUES %s ON DUPLICATE KEY UPDATE node=VALUES(node)" % args, | ||||
Augie Fackler
|
r43346 | params=values, | ||
) | ||||
Pulkit Goyal
|
r37204 | |||
def deletebookmarks(self, patterns): | ||||
"""Accepts list of bookmark patterns and deletes them. | ||||
If `commit` is set then bookmark will actually be deleted. Otherwise | ||||
deletion will be delayed until the end of transaction. | ||||
""" | ||||
if not self._connected: | ||||
self.sqlconnect() | ||||
Augie Fackler
|
r43347 | self.log.info(b"DELETE BOOKMARKS: %s" % patterns) | ||
Pulkit Goyal
|
r37204 | for pattern in patterns: | ||
pattern = _convertbookmarkpattern(pattern) | ||||
self.sqlcursor.execute( | ||||
Augie Fackler
|
r43347 | b"DELETE from bookmarkstonode WHERE bookmark LIKE (%s) " | ||
b"and reponame = %s", | ||||
Augie Fackler
|
r43346 | params=(pattern, self.reponame), | ||
) | ||||
Pulkit Goyal
|
r37204 | |||
def getbundle(self, node): | ||||
"""Returns the bundleid for the bundle that contains the given node.""" | ||||
if not self._connected: | ||||
self.sqlconnect() | ||||
Augie Fackler
|
r43347 | self.log.info(b"GET BUNDLE %r %r" % (self.reponame, node)) | ||
Pulkit Goyal
|
r37204 | self.sqlcursor.execute( | ||
Augie Fackler
|
r43347 | b"SELECT bundle from nodestobundle " | ||
b"WHERE node = %s AND reponame = %s", | ||||
Augie Fackler
|
r43346 | params=(node, self.reponame), | ||
) | ||||
Pulkit Goyal
|
r37204 | result = self.sqlcursor.fetchall() | ||
if len(result) != 1 or len(result[0]) != 1: | ||||
Augie Fackler
|
r43347 | self.log.info(b"No matching node") | ||
Pulkit Goyal
|
r37204 | return None | ||
bundle = result[0][0] | ||||
Augie Fackler
|
r43347 | self.log.info(b"Found bundle %r" % bundle) | ||
Pulkit Goyal
|
r37204 | return bundle | ||
def getnode(self, bookmark): | ||||
"""Returns the node for the given bookmark. None if it doesn't exist.""" | ||||
if not self._connected: | ||||
self.sqlconnect() | ||||
self.log.info( | ||||
Augie Fackler
|
r43347 | b"GET NODE reponame: %r bookmark: %r" % (self.reponame, bookmark) | ||
Augie Fackler
|
r43346 | ) | ||
Pulkit Goyal
|
r37204 | self.sqlcursor.execute( | ||
Augie Fackler
|
r43347 | b"SELECT node from bookmarkstonode WHERE " | ||
b"bookmark = %s AND reponame = %s", | ||||
Augie Fackler
|
r43346 | params=(bookmark, self.reponame), | ||
) | ||||
Pulkit Goyal
|
r37204 | result = self.sqlcursor.fetchall() | ||
if len(result) != 1 or len(result[0]) != 1: | ||||
Augie Fackler
|
r43347 | self.log.info(b"No matching bookmark") | ||
Pulkit Goyal
|
r37204 | return None | ||
node = result[0][0] | ||||
Augie Fackler
|
r43347 | self.log.info(b"Found node %r" % node) | ||
Pulkit Goyal
|
r37204 | return node | ||
def getbookmarks(self, query): | ||||
if not self._connected: | ||||
self.sqlconnect() | ||||
self.log.info( | ||||
Augie Fackler
|
r43347 | b"QUERY BOOKMARKS reponame: %r query: %r" % (self.reponame, query) | ||
Augie Fackler
|
r43346 | ) | ||
Pulkit Goyal
|
r37204 | query = _convertbookmarkpattern(query) | ||
self.sqlcursor.execute( | ||||
Augie Fackler
|
r43347 | b"SELECT bookmark, node from bookmarkstonode WHERE " | ||
b"reponame = %s AND bookmark LIKE %s", | ||||
Augie Fackler
|
r43346 | params=(self.reponame, query), | ||
) | ||||
Pulkit Goyal
|
r37204 | result = self.sqlcursor.fetchall() | ||
bookmarks = {} | ||||
for row in result: | ||||
if len(row) != 2: | ||||
Augie Fackler
|
r43347 | self.log.info(b"Bad row returned: %s" % row) | ||
Pulkit Goyal
|
r37204 | continue | ||
bookmarks[row[0]] = row[1] | ||||
return bookmarks | ||||
def saveoptionaljsonmetadata(self, node, jsonmetadata): | ||||
if not self._connected: | ||||
self.sqlconnect() | ||||
self.log.info( | ||||
Augie Fackler
|
r43346 | ( | ||
Augie Fackler
|
r43347 | b"INSERT METADATA, QUERY BOOKMARKS reponame: %r " | ||
+ b"node: %r, jsonmetadata: %s" | ||||
Augie Fackler
|
r43346 | ) | ||
% (self.reponame, node, jsonmetadata) | ||||
) | ||||
Pulkit Goyal
|
r37204 | |||
self.sqlcursor.execute( | ||||
Augie Fackler
|
r43347 | b"UPDATE nodesmetadata SET optional_json_metadata=%s WHERE " | ||
b"reponame=%s AND node=%s", | ||||
Augie Fackler
|
r43346 | params=(jsonmetadata, self.reponame, node), | ||
) | ||||
Pulkit Goyal
|
r37204 | |||
class CustomConverter(mysql.connector.conversion.MySQLConverter): | ||||
"""Ensure that all values being returned are returned as python string | ||||
(versus the default byte arrays).""" | ||||
Augie Fackler
|
r43346 | |||
Pulkit Goyal
|
r37204 | def _STRING_to_python(self, value, dsc=None): | ||
return str(value) | ||||
def _VAR_STRING_to_python(self, value, dsc=None): | ||||
return str(value) | ||||
def _BLOB_to_python(self, value, dsc=None): | ||||
return str(value) | ||||