##// END OF EJS Templates
transaction: mark backup-related attributes private...
transaction: mark backup-related attributes private As the transaction is gaining more functions and attributes, it is important to clarify what is part of the public API.

File last commit:

r23249:84720eab default
r23249:84720eab default
Show More
transaction.py
422 lines | 14.0 KiB | text/x-python | PythonLexer
# transaction.py - simple journaling scheme for mercurial
#
# This transaction scheme is intended to gracefully handle program
# errors and interruptions. More serious failures like system crashes
# can be recovered with an fsck-like tool. As the whole repository is
# effectively log-structured, this should amount to simply truncating
# anything that isn't referenced in the changelog.
#
# Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from i18n import _
import errno
import error, util
version = 1
def active(func):
def _active(self, *args, **kwds):
if self.count == 0:
raise error.Abort(_(
'cannot use transaction when it is already committed/aborted'))
return func(self, *args, **kwds)
return _active
def _playback(journal, report, opener, entries, backupentries, unlink=True):
for f, o, _ignore in entries:
if o or not unlink:
try:
fp = opener(f, 'a')
fp.truncate(o)
fp.close()
except IOError:
report(_("failed to truncate %s\n") % f)
raise
else:
try:
opener.unlink(f)
except (IOError, OSError), inst:
if inst.errno != errno.ENOENT:
raise
backupfiles = []
for f, b in backupentries:
filepath = opener.join(f)
backuppath = opener.join(b)
try:
util.copyfile(backuppath, filepath)
backupfiles.append(b)
except IOError:
report(_("failed to recover %s\n") % f)
raise
opener.unlink(journal)
backuppath = "%s.backupfiles" % journal
if opener.exists(backuppath):
opener.unlink(backuppath)
for f in backupfiles:
opener.unlink(f)
class transaction(object):
def __init__(self, report, opener, journal, after=None, createmode=None,
onclose=None, onabort=None):
"""Begin a new transaction
Begins a new transaction that allows rolling back writes in the event of
an exception.
* `after`: called after the transaction has been committed
* `createmode`: the mode of the journal file that will be created
* `onclose`: called as the transaction is closing, but before it is
closed
* `onabort`: called as the transaction is aborting, but before any files
have been truncated
"""
self.count = 1
self.usages = 1
self.report = report
self.opener = opener
self.after = after
self.onclose = onclose
self.onabort = onabort
self.entries = []
self.map = {}
# a list of ('path', 'backuppath') entries.
self._backupentries = []
self._backupmap = {}
self.journal = journal
self._queue = []
# a dict of arguments to be passed to hooks
self.hookargs = {}
self._backupjournal = "%s.backupfiles" % journal
self.file = opener.open(self.journal, "w")
self._backupsfile = opener.open(self._backupjournal, 'w')
self._backupsfile.write('%d\n' % version)
if createmode is not None:
opener.chmod(self.journal, createmode & 0666)
opener.chmod(self._backupjournal, createmode & 0666)
# hold file generations to be performed on commit
self._filegenerators = {}
# hold callbalk to write pending data for hooks
self._pendingcallback = {}
# True is any pending data have been written ever
self._anypending = False
# holds callback to call when writing the transaction
self._finalizecallback = {}
# hold callbalk for post transaction close
self._postclosecallback = {}
def __del__(self):
if self.journal:
self._abort()
@active
def startgroup(self):
self._queue.append(([], []))
@active
def endgroup(self):
q = self._queue.pop()
self.entries.extend(q[0])
self._backupentries.extend(q[1])
offsets = []
backups = []
for f, o, _data in q[0]:
offsets.append((f, o))
for f, b in q[1]:
backups.append((f, b))
d = ''.join(['%s\0%d\n' % (f, o) for f, o in offsets])
self.file.write(d)
self.file.flush()
d = ''.join(['%s\0%s\n' % (f, b) for f, b in backups])
self._backupsfile.write(d)
self._backupsfile.flush()
@active
def add(self, file, offset, data=None):
if file in self.map or file in self._backupmap:
return
if self._queue:
self._queue[-1][0].append((file, offset, data))
return
self.entries.append((file, offset, data))
self.map[file] = len(self.entries) - 1
# add enough data to the journal to do the truncate
self.file.write("%s\0%d\n" % (file, offset))
self.file.flush()
@active
def addbackup(self, file, hardlink=True, vfs=None):
"""Adds a backup of the file to the transaction
Calling addbackup() creates a hardlink backup of the specified file
that is used to recover the file in the event of the transaction
aborting.
* `file`: the file path, relative to .hg/store
* `hardlink`: use a hardlink to quickly create the backup
"""
if file in self.map or file in self._backupmap:
return
backupfile = "%s.backup.%s" % (self.journal, file)
if vfs is None:
vfs = self.opener
if vfs.exists(file):
filepath = vfs.join(file)
backuppath = self.opener.join(backupfile)
util.copyfiles(filepath, backuppath, hardlink=hardlink)
else:
self.add(file, 0)
return
if self._queue:
self._queue[-1][1].append((file, backupfile))
return
self._backupentries.append((file, backupfile))
self._backupmap[file] = len(self._backupentries) - 1
self._backupsfile.write("%s\0%s\n" % (file, backupfile))
self._backupsfile.flush()
@active
def addfilegenerator(self, genid, filenames, genfunc, order=0, vfs=None):
"""add a function to generates some files at transaction commit
The `genfunc` argument is a function capable of generating proper
content of each entry in the `filename` tuple.
At transaction close time, `genfunc` will be called with one file
object argument per entries in `filenames`.
The transaction itself is responsible for the backup, creation and
final write of such file.
The `genid` argument is used to ensure the same set of file is only
generated once. Call to `addfilegenerator` for a `genid` already
present will overwrite the old entry.
The `order` argument may be used to control the order in which multiple
generator will be executed.
"""
# For now, we are unable to do proper backup and restore of custom vfs
# but for bookmarks that are handled outside this mechanism.
assert vfs is None or filenames == ('bookmarks',)
self._filegenerators[genid] = (order, filenames, genfunc, vfs)
def _generatefiles(self):
# write files registered for generation
for entry in sorted(self._filegenerators.values()):
order, filenames, genfunc, vfs = entry
if vfs is None:
vfs = self.opener
files = []
try:
for name in filenames:
# Some files are already backed up when creating the
# localrepo. Until this is properly fixed we disable the
# backup for them.
if name not in ('phaseroots', 'bookmarks'):
self.addbackup(name)
files.append(vfs(name, 'w', atomictemp=True))
genfunc(*files)
finally:
for f in files:
f.close()
@active
def find(self, file):
if file in self.map:
return self.entries[self.map[file]]
if file in self._backupmap:
return self._backupentries[self._backupmap[file]]
return None
@active
def replace(self, file, offset, data=None):
'''
replace can only replace already committed entries
that are not pending in the queue
'''
if file not in self.map:
raise KeyError(file)
index = self.map[file]
self.entries[index] = (file, offset, data)
self.file.write("%s\0%d\n" % (file, offset))
self.file.flush()
@active
def nest(self):
self.count += 1
self.usages += 1
return self
def release(self):
if self.count > 0:
self.usages -= 1
# if the transaction scopes are left without being closed, fail
if self.count > 0 and self.usages == 0:
self._abort()
def running(self):
return self.count > 0
def addpending(self, category, callback):
"""add a callback to be called when the transaction is pending
Category is a unique identifier to allow overwriting an old callback
with a newer callback.
"""
self._pendingcallback[category] = callback
@active
def writepending(self):
'''write pending file to temporary version
This is used to allow hooks to view a transaction before commit'''
categories = sorted(self._pendingcallback)
for cat in categories:
# remove callback since the data will have been flushed
any = self._pendingcallback.pop(cat)()
self._anypending = self._anypending or any
return self._anypending
@active
def addfinalize(self, category, callback):
"""add a callback to be called when the transaction is closed
Category is a unique identifier to allow overwriting old callbacks with
newer callbacks.
"""
self._finalizecallback[category] = callback
@active
def addpostclose(self, category, callback):
"""add a callback to be called after the transaction is closed
Category is a unique identifier to allow overwriting an old callback
with a newer callback.
"""
self._postclosecallback[category] = callback
@active
def close(self):
'''commit the transaction'''
if self.count == 1 and self.onclose is not None:
self._generatefiles()
categories = sorted(self._finalizecallback)
for cat in categories:
self._finalizecallback[cat]()
self.onclose()
self.count -= 1
if self.count != 0:
return
self.file.close()
self._backupsfile.close()
self.entries = []
if self.after:
self.after()
if self.opener.isfile(self.journal):
self.opener.unlink(self.journal)
if self.opener.isfile(self._backupjournal):
self.opener.unlink(self._backupjournal)
for _f, b in self._backupentries:
self.opener.unlink(b)
self._backupentries = []
self.journal = None
# run post close action
categories = sorted(self._postclosecallback)
for cat in categories:
self._postclosecallback[cat]()
@active
def abort(self):
'''abort the transaction (generally called on error, or when the
transaction is not explicitly committed before going out of
scope)'''
self._abort()
def _abort(self):
self.count = 0
self.usages = 0
self.file.close()
self._backupsfile.close()
if self.onabort is not None:
self.onabort()
try:
if not self.entries and not self._backupentries:
if self.journal:
self.opener.unlink(self.journal)
if self._backupjournal:
self.opener.unlink(self._backupjournal)
return
self.report(_("transaction abort!\n"))
try:
_playback(self.journal, self.report, self.opener,
self.entries, self._backupentries, False)
self.report(_("rollback completed\n"))
except Exception:
self.report(_("rollback failed - please run hg recover\n"))
finally:
self.journal = None
def rollback(opener, file, report):
"""Rolls back the transaction contained in the given file
Reads the entries in the specified file, and the corresponding
'*.backupfiles' file, to recover from an incomplete transaction.
* `file`: a file containing a list of entries, specifying where
to truncate each file. The file should contain a list of
file\0offset pairs, delimited by newlines. The corresponding
'*.backupfiles' file should contain a list of file\0backupfile
pairs, delimited by \0.
"""
entries = []
backupentries = []
fp = opener.open(file)
lines = fp.readlines()
fp.close()
for l in lines:
try:
f, o = l.split('\0')
entries.append((f, int(o), None))
except ValueError:
report(_("couldn't read journal entry %r!\n") % l)
backupjournal = "%s.backupfiles" % file
if opener.exists(backupjournal):
fp = opener.open(backupjournal)
lines = fp.readlines()
if lines:
ver = lines[0][:-1]
if ver == str(version):
for line in lines[1:]:
if line:
# Shave off the trailing newline
line = line[:-1]
f, b = line.split('\0')
backupentries.append((f, b))
else:
report(_("journal was created by a newer version of "
"Mercurial"))
_playback(file, report, opener, entries, backupentries)