# Infinite push # # Copyright 2016 Facebook, Inc. # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. from __future__ import absolute_import import logging import os import time import warnings import mysql.connector from . import indexapi def _convertbookmarkpattern(pattern): pattern = pattern.replace('_', '\\_') pattern = pattern.replace('%', '\\%') if pattern.endswith('*'): pattern = pattern[:-1] + '%' return pattern class sqlindexapi(indexapi.indexapi): ''' Sql backend for infinitepush index. See schema.sql ''' def __init__( self, reponame, host, port, database, user, password, logfile, loglevel, waittimeout=300, locktimeout=120, ): super(sqlindexapi, self).__init__() self.reponame = reponame self.sqlargs = { 'host': host, 'port': port, 'database': database, 'user': user, 'password': password, } self.sqlconn = None self.sqlcursor = None if not logfile: logfile = os.devnull logging.basicConfig(filename=logfile) self.log = logging.getLogger() self.log.setLevel(loglevel) self._connected = False self._waittimeout = waittimeout self._locktimeout = locktimeout def sqlconnect(self): if self.sqlconn: raise indexapi.indexexception("SQL connection already open") if self.sqlcursor: raise indexapi.indexexception( "SQL cursor already open without" " connection" ) retry = 3 while True: try: self.sqlconn = mysql.connector.connect(**self.sqlargs) # Code is copy-pasted from hgsql. Bug fixes need to be # back-ported! # The default behavior is to return byte arrays, when we # need strings. This custom convert returns strings. self.sqlconn.set_converter_class(CustomConverter) self.sqlconn.autocommit = False break except mysql.connector.errors.Error: # mysql can be flakey occasionally, so do some minimal # retrying. retry -= 1 if retry == 0: raise time.sleep(0.2) waittimeout = self.sqlconn.converter.escape('%s' % self._waittimeout) self.sqlcursor = self.sqlconn.cursor() self.sqlcursor.execute("SET wait_timeout=%s" % waittimeout) self.sqlcursor.execute( "SET innodb_lock_wait_timeout=%s" % self._locktimeout ) self._connected = True def close(self): """Cleans up the metadata store connection.""" with warnings.catch_warnings(): warnings.simplefilter("ignore") self.sqlcursor.close() self.sqlconn.close() self.sqlcursor = None self.sqlconn = None def __enter__(self): if not self._connected: self.sqlconnect() return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is None: self.sqlconn.commit() else: self.sqlconn.rollback() def addbundle(self, bundleid, nodesctx): if not self._connected: self.sqlconnect() self.log.info("ADD BUNDLE %r %r" % (self.reponame, bundleid)) self.sqlcursor.execute( "INSERT INTO bundles(bundle, reponame) VALUES " "(%s, %s)", params=(bundleid, self.reponame), ) for ctx in nodesctx: self.sqlcursor.execute( "INSERT INTO nodestobundle(node, bundle, reponame) " "VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE " "bundle=VALUES(bundle)", params=(ctx.hex(), bundleid, self.reponame), ) extra = ctx.extra() author_name = ctx.user() committer_name = extra.get('committer', ctx.user()) author_date = int(ctx.date()[0]) committer_date = int(extra.get('committer_date', author_date)) self.sqlcursor.execute( "INSERT IGNORE INTO nodesmetadata(node, message, p1, p2, " "author, committer, author_date, committer_date, " "reponame) VALUES " "(%s, %s, %s, %s, %s, %s, %s, %s, %s)", params=( ctx.hex(), ctx.description(), ctx.p1().hex(), ctx.p2().hex(), author_name, committer_name, author_date, committer_date, self.reponame, ), ) def addbookmark(self, bookmark, node): """Takes a bookmark name and hash, and records mapping in the metadata store.""" if not self._connected: self.sqlconnect() self.log.info( "ADD BOOKMARKS %r bookmark: %r node: %r" % (self.reponame, bookmark, node) ) self.sqlcursor.execute( "INSERT INTO bookmarkstonode(bookmark, node, reponame) " "VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE node=VALUES(node)", params=(bookmark, node, self.reponame), ) def addmanybookmarks(self, bookmarks): if not self._connected: self.sqlconnect() args = [] values = [] for bookmark, node in bookmarks.iteritems(): args.append('(%s, %s, %s)') values.extend((bookmark, node, self.reponame)) args = ','.join(args) self.sqlcursor.execute( "INSERT INTO bookmarkstonode(bookmark, node, reponame) " "VALUES %s ON DUPLICATE KEY UPDATE node=VALUES(node)" % args, params=values, ) def deletebookmarks(self, patterns): """Accepts list of bookmark patterns and deletes them. If `commit` is set then bookmark will actually be deleted. Otherwise deletion will be delayed until the end of transaction. """ if not self._connected: self.sqlconnect() self.log.info("DELETE BOOKMARKS: %s" % patterns) for pattern in patterns: pattern = _convertbookmarkpattern(pattern) self.sqlcursor.execute( "DELETE from bookmarkstonode WHERE bookmark LIKE (%s) " "and reponame = %s", params=(pattern, self.reponame), ) def getbundle(self, node): """Returns the bundleid for the bundle that contains the given node.""" if not self._connected: self.sqlconnect() self.log.info("GET BUNDLE %r %r" % (self.reponame, node)) self.sqlcursor.execute( "SELECT bundle from nodestobundle " "WHERE node = %s AND reponame = %s", params=(node, self.reponame), ) result = self.sqlcursor.fetchall() if len(result) != 1 or len(result[0]) != 1: self.log.info("No matching node") return None bundle = result[0][0] self.log.info("Found bundle %r" % bundle) return bundle def getnode(self, bookmark): """Returns the node for the given bookmark. None if it doesn't exist.""" if not self._connected: self.sqlconnect() self.log.info( "GET NODE reponame: %r bookmark: %r" % (self.reponame, bookmark) ) self.sqlcursor.execute( "SELECT node from bookmarkstonode WHERE " "bookmark = %s AND reponame = %s", params=(bookmark, self.reponame), ) result = self.sqlcursor.fetchall() if len(result) != 1 or len(result[0]) != 1: self.log.info("No matching bookmark") return None node = result[0][0] self.log.info("Found node %r" % node) return node def getbookmarks(self, query): if not self._connected: self.sqlconnect() self.log.info( "QUERY BOOKMARKS reponame: %r query: %r" % (self.reponame, query) ) query = _convertbookmarkpattern(query) self.sqlcursor.execute( "SELECT bookmark, node from bookmarkstonode WHERE " "reponame = %s AND bookmark LIKE %s", params=(self.reponame, query), ) result = self.sqlcursor.fetchall() bookmarks = {} for row in result: if len(row) != 2: self.log.info("Bad row returned: %s" % row) continue bookmarks[row[0]] = row[1] return bookmarks def saveoptionaljsonmetadata(self, node, jsonmetadata): if not self._connected: self.sqlconnect() self.log.info( ( "INSERT METADATA, QUERY BOOKMARKS reponame: %r " + "node: %r, jsonmetadata: %s" ) % (self.reponame, node, jsonmetadata) ) self.sqlcursor.execute( "UPDATE nodesmetadata SET optional_json_metadata=%s WHERE " "reponame=%s AND node=%s", params=(jsonmetadata, self.reponame, node), ) class CustomConverter(mysql.connector.conversion.MySQLConverter): """Ensure that all values being returned are returned as python string (versus the default byte arrays).""" def _STRING_to_python(self, value, dsc=None): return str(value) def _VAR_STRING_to_python(self, value, dsc=None): return str(value) def _BLOB_to_python(self, value, dsc=None): return str(value)