##// END OF EJS Templates
store: make `walk` return an entry for obsolescence if requested so...
store: make `walk` return an entry for obsolescence if requested so Instead of having dedicated code in the streamclone code, we should have the store deal with advertising the data it contains.

File last commit:

r49768:f254fc73 default
r51407:0925eaf0 default
Show More
sqlindexapi.py
295 lines | 9.7 KiB | text/x-python | PythonLexer
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 # Infinite push
#
# Copyright 2016 Facebook, Inc.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
import logging
import os
import time
import warnings
import mysql.connector
from . import indexapi
Augie Fackler
formatting: blacken the codebase...
r43346
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 def _convertbookmarkpattern(pattern):
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 pattern = pattern.replace(b'_', b'\\_')
pattern = pattern.replace(b'%', b'\\%')
if pattern.endswith(b'*'):
pattern = pattern[:-1] + b'%'
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 return pattern
Augie Fackler
formatting: blacken the codebase...
r43346
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 class sqlindexapi(indexapi.indexapi):
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 """
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 Sql backend for infinitepush index. See schema.sql
Augie Fackler
formating: upgrade to black 20.8b1...
r46554 """
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204
Augie Fackler
formatting: blacken the codebase...
r43346 def __init__(
self,
reponame,
host,
port,
database,
user,
password,
logfile,
loglevel,
waittimeout=300,
locktimeout=120,
):
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 super(sqlindexapi, self).__init__()
self.reponame = reponame
self.sqlargs = {
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b'host': host,
b'port': port,
b'database': database,
b'user': user,
b'password': password,
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 }
self.sqlconn = None
self.sqlcursor = None
if not logfile:
logfile = os.devnull
logging.basicConfig(filename=logfile)
self.log = logging.getLogger()
self.log.setLevel(loglevel)
self._connected = False
self._waittimeout = waittimeout
self._locktimeout = locktimeout
def sqlconnect(self):
if self.sqlconn:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 raise indexapi.indexexception(b"SQL connection already open")
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 if self.sqlcursor:
Augie Fackler
formatting: blacken the codebase...
r43346 raise indexapi.indexexception(
Martin von Zweigbergk
cleanup: join string literals that are already on one line...
r43387 b"SQL cursor already open without connection"
Augie Fackler
formatting: blacken the codebase...
r43346 )
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 retry = 3
while True:
try:
Pulkit Goyal
infinitepush: don't force ipv6 while connecting to mysql server...
r37251 self.sqlconn = mysql.connector.connect(**self.sqlargs)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204
# Code is copy-pasted from hgsql. Bug fixes need to be
# back-ported!
# The default behavior is to return byte arrays, when we
# need strings. This custom convert returns strings.
self.sqlconn.set_converter_class(CustomConverter)
self.sqlconn.autocommit = False
break
except mysql.connector.errors.Error:
# mysql can be flakey occasionally, so do some minimal
# retrying.
retry -= 1
if retry == 0:
raise
time.sleep(0.2)
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 waittimeout = self.sqlconn.converter.escape(b'%s' % self._waittimeout)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204
self.sqlcursor = self.sqlconn.cursor()
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.sqlcursor.execute(b"SET wait_timeout=%s" % waittimeout)
Augie Fackler
formatting: blacken the codebase...
r43346 self.sqlcursor.execute(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"SET innodb_lock_wait_timeout=%s" % self._locktimeout
Augie Fackler
formatting: blacken the codebase...
r43346 )
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 self._connected = True
def close(self):
"""Cleans up the metadata store connection."""
with warnings.catch_warnings():
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 warnings.simplefilter(b"ignore")
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 self.sqlcursor.close()
self.sqlconn.close()
self.sqlcursor = None
self.sqlconn = None
def __enter__(self):
if not self._connected:
self.sqlconnect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.sqlconn.commit()
else:
self.sqlconn.rollback()
def addbundle(self, bundleid, nodesctx):
if not self._connected:
self.sqlconnect()
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.log.info(b"ADD BUNDLE %r %r" % (self.reponame, bundleid))
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 self.sqlcursor.execute(
Martin von Zweigbergk
cleanup: join string literals that are already on one line...
r43387 b"INSERT INTO bundles(bundle, reponame) VALUES (%s, %s)",
Augie Fackler
formatting: blacken the codebase...
r43346 params=(bundleid, self.reponame),
)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 for ctx in nodesctx:
self.sqlcursor.execute(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"INSERT INTO nodestobundle(node, bundle, reponame) "
b"VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE "
b"bundle=VALUES(bundle)",
Augie Fackler
formatting: blacken the codebase...
r43346 params=(ctx.hex(), bundleid, self.reponame),
)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204
extra = ctx.extra()
author_name = ctx.user()
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 committer_name = extra.get(b'committer', ctx.user())
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 author_date = int(ctx.date()[0])
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 committer_date = int(extra.get(b'committer_date', author_date))
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 self.sqlcursor.execute(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"INSERT IGNORE INTO nodesmetadata(node, message, p1, p2, "
b"author, committer, author_date, committer_date, "
b"reponame) VALUES "
b"(%s, %s, %s, %s, %s, %s, %s, %s, %s)",
Augie Fackler
formatting: blacken the codebase...
r43346 params=(
ctx.hex(),
ctx.description(),
ctx.p1().hex(),
ctx.p2().hex(),
author_name,
committer_name,
author_date,
committer_date,
self.reponame,
),
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 )
def addbookmark(self, bookmark, node):
"""Takes a bookmark name and hash, and records mapping in the metadata
store."""
if not self._connected:
self.sqlconnect()
self.log.info(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"ADD BOOKMARKS %r bookmark: %r node: %r"
Augie Fackler
formatting: blacken the codebase...
r43346 % (self.reponame, bookmark, node)
)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 self.sqlcursor.execute(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"INSERT INTO bookmarkstonode(bookmark, node, reponame) "
b"VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE node=VALUES(node)",
Augie Fackler
formatting: blacken the codebase...
r43346 params=(bookmark, node, self.reponame),
)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204
def addmanybookmarks(self, bookmarks):
if not self._connected:
self.sqlconnect()
args = []
values = []
Gregory Szorc
global: bulk replace simple pycompat.iteritems(x) with x.items()...
r49768 for bookmark, node in bookmarks.items():
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 args.append(b'(%s, %s, %s)')
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 values.extend((bookmark, node, self.reponame))
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 args = b','.join(args)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204
self.sqlcursor.execute(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"INSERT INTO bookmarkstonode(bookmark, node, reponame) "
b"VALUES %s ON DUPLICATE KEY UPDATE node=VALUES(node)" % args,
Augie Fackler
formatting: blacken the codebase...
r43346 params=values,
)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204
def deletebookmarks(self, patterns):
"""Accepts list of bookmark patterns and deletes them.
If `commit` is set then bookmark will actually be deleted. Otherwise
deletion will be delayed until the end of transaction.
"""
if not self._connected:
self.sqlconnect()
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.log.info(b"DELETE BOOKMARKS: %s" % patterns)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 for pattern in patterns:
pattern = _convertbookmarkpattern(pattern)
self.sqlcursor.execute(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"DELETE from bookmarkstonode WHERE bookmark LIKE (%s) "
b"and reponame = %s",
Augie Fackler
formatting: blacken the codebase...
r43346 params=(pattern, self.reponame),
)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204
def getbundle(self, node):
"""Returns the bundleid for the bundle that contains the given node."""
if not self._connected:
self.sqlconnect()
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.log.info(b"GET BUNDLE %r %r" % (self.reponame, node))
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 self.sqlcursor.execute(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"SELECT bundle from nodestobundle "
b"WHERE node = %s AND reponame = %s",
Augie Fackler
formatting: blacken the codebase...
r43346 params=(node, self.reponame),
)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 result = self.sqlcursor.fetchall()
if len(result) != 1 or len(result[0]) != 1:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.log.info(b"No matching node")
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 return None
bundle = result[0][0]
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.log.info(b"Found bundle %r" % bundle)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 return bundle
def getnode(self, bookmark):
"""Returns the node for the given bookmark. None if it doesn't exist."""
if not self._connected:
self.sqlconnect()
self.log.info(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"GET NODE reponame: %r bookmark: %r" % (self.reponame, bookmark)
Augie Fackler
formatting: blacken the codebase...
r43346 )
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 self.sqlcursor.execute(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"SELECT node from bookmarkstonode WHERE "
b"bookmark = %s AND reponame = %s",
Augie Fackler
formatting: blacken the codebase...
r43346 params=(bookmark, self.reponame),
)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 result = self.sqlcursor.fetchall()
if len(result) != 1 or len(result[0]) != 1:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.log.info(b"No matching bookmark")
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 return None
node = result[0][0]
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.log.info(b"Found node %r" % node)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 return node
def getbookmarks(self, query):
if not self._connected:
self.sqlconnect()
self.log.info(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"QUERY BOOKMARKS reponame: %r query: %r" % (self.reponame, query)
Augie Fackler
formatting: blacken the codebase...
r43346 )
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 query = _convertbookmarkpattern(query)
self.sqlcursor.execute(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"SELECT bookmark, node from bookmarkstonode WHERE "
b"reponame = %s AND bookmark LIKE %s",
Augie Fackler
formatting: blacken the codebase...
r43346 params=(self.reponame, query),
)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 result = self.sqlcursor.fetchall()
bookmarks = {}
for row in result:
if len(row) != 2:
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 self.log.info(b"Bad row returned: %s" % row)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 continue
bookmarks[row[0]] = row[1]
return bookmarks
def saveoptionaljsonmetadata(self, node, jsonmetadata):
if not self._connected:
self.sqlconnect()
self.log.info(
Augie Fackler
formatting: blacken the codebase...
r43346 (
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"INSERT METADATA, QUERY BOOKMARKS reponame: %r "
+ b"node: %r, jsonmetadata: %s"
Augie Fackler
formatting: blacken the codebase...
r43346 )
% (self.reponame, node, jsonmetadata)
)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204
self.sqlcursor.execute(
Augie Fackler
formatting: byteify all mercurial/ and hgext/ string literals...
r43347 b"UPDATE nodesmetadata SET optional_json_metadata=%s WHERE "
b"reponame=%s AND node=%s",
Augie Fackler
formatting: blacken the codebase...
r43346 params=(jsonmetadata, self.reponame, node),
)
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204
class CustomConverter(mysql.connector.conversion.MySQLConverter):
"""Ensure that all values being returned are returned as python string
(versus the default byte arrays)."""
Augie Fackler
formatting: blacken the codebase...
r43346
Pulkit Goyal
infinitepush: move the extension to core from fb-hgext...
r37204 def _STRING_to_python(self, value, dsc=None):
return str(value)
def _VAR_STRING_to_python(self, value, dsc=None):
return str(value)
def _BLOB_to_python(self, value, dsc=None):
return str(value)