##// END OF EJS Templates
transaction: allow registering a finalization callback...
Pierre-Yves David -
r23204:10beda5b default
parent child Browse files
Show More
@@ -1,392 +1,406 b''
1 # transaction.py - simple journaling scheme for mercurial
1 # transaction.py - simple journaling scheme for mercurial
2 #
2 #
3 # This transaction scheme is intended to gracefully handle program
3 # This transaction scheme is intended to gracefully handle program
4 # errors and interruptions. More serious failures like system crashes
4 # errors and interruptions. More serious failures like system crashes
5 # can be recovered with an fsck-like tool. As the whole repository is
5 # can be recovered with an fsck-like tool. As the whole repository is
6 # effectively log-structured, this should amount to simply truncating
6 # effectively log-structured, this should amount to simply truncating
7 # anything that isn't referenced in the changelog.
7 # anything that isn't referenced in the changelog.
8 #
8 #
9 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
9 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
10 #
10 #
11 # This software may be used and distributed according to the terms of the
11 # This software may be used and distributed according to the terms of the
12 # GNU General Public License version 2 or any later version.
12 # GNU General Public License version 2 or any later version.
13
13
14 from i18n import _
14 from i18n import _
15 import errno
15 import errno
16 import error, util
16 import error, util
17
17
18 version = 1
18 version = 1
19
19
20 def active(func):
20 def active(func):
21 def _active(self, *args, **kwds):
21 def _active(self, *args, **kwds):
22 if self.count == 0:
22 if self.count == 0:
23 raise error.Abort(_(
23 raise error.Abort(_(
24 'cannot use transaction when it is already committed/aborted'))
24 'cannot use transaction when it is already committed/aborted'))
25 return func(self, *args, **kwds)
25 return func(self, *args, **kwds)
26 return _active
26 return _active
27
27
28 def _playback(journal, report, opener, entries, backupentries, unlink=True):
28 def _playback(journal, report, opener, entries, backupentries, unlink=True):
29 for f, o, _ignore in entries:
29 for f, o, _ignore in entries:
30 if o or not unlink:
30 if o or not unlink:
31 try:
31 try:
32 fp = opener(f, 'a')
32 fp = opener(f, 'a')
33 fp.truncate(o)
33 fp.truncate(o)
34 fp.close()
34 fp.close()
35 except IOError:
35 except IOError:
36 report(_("failed to truncate %s\n") % f)
36 report(_("failed to truncate %s\n") % f)
37 raise
37 raise
38 else:
38 else:
39 try:
39 try:
40 opener.unlink(f)
40 opener.unlink(f)
41 except (IOError, OSError), inst:
41 except (IOError, OSError), inst:
42 if inst.errno != errno.ENOENT:
42 if inst.errno != errno.ENOENT:
43 raise
43 raise
44
44
45 backupfiles = []
45 backupfiles = []
46 for f, b, _ignore in backupentries:
46 for f, b, _ignore in backupentries:
47 filepath = opener.join(f)
47 filepath = opener.join(f)
48 backuppath = opener.join(b)
48 backuppath = opener.join(b)
49 try:
49 try:
50 util.copyfile(backuppath, filepath)
50 util.copyfile(backuppath, filepath)
51 backupfiles.append(b)
51 backupfiles.append(b)
52 except IOError:
52 except IOError:
53 report(_("failed to recover %s\n") % f)
53 report(_("failed to recover %s\n") % f)
54 raise
54 raise
55
55
56 opener.unlink(journal)
56 opener.unlink(journal)
57 backuppath = "%s.backupfiles" % journal
57 backuppath = "%s.backupfiles" % journal
58 if opener.exists(backuppath):
58 if opener.exists(backuppath):
59 opener.unlink(backuppath)
59 opener.unlink(backuppath)
60 for f in backupfiles:
60 for f in backupfiles:
61 opener.unlink(f)
61 opener.unlink(f)
62
62
63 class transaction(object):
63 class transaction(object):
64 def __init__(self, report, opener, journal, after=None, createmode=None,
64 def __init__(self, report, opener, journal, after=None, createmode=None,
65 onclose=None, onabort=None):
65 onclose=None, onabort=None):
66 """Begin a new transaction
66 """Begin a new transaction
67
67
68 Begins a new transaction that allows rolling back writes in the event of
68 Begins a new transaction that allows rolling back writes in the event of
69 an exception.
69 an exception.
70
70
71 * `after`: called after the transaction has been committed
71 * `after`: called after the transaction has been committed
72 * `createmode`: the mode of the journal file that will be created
72 * `createmode`: the mode of the journal file that will be created
73 * `onclose`: called as the transaction is closing, but before it is
73 * `onclose`: called as the transaction is closing, but before it is
74 closed
74 closed
75 * `onabort`: called as the transaction is aborting, but before any files
75 * `onabort`: called as the transaction is aborting, but before any files
76 have been truncated
76 have been truncated
77 """
77 """
78 self.count = 1
78 self.count = 1
79 self.usages = 1
79 self.usages = 1
80 self.report = report
80 self.report = report
81 self.opener = opener
81 self.opener = opener
82 self.after = after
82 self.after = after
83 self.onclose = onclose
83 self.onclose = onclose
84 self.onabort = onabort
84 self.onabort = onabort
85 self.entries = []
85 self.entries = []
86 self.backupentries = []
86 self.backupentries = []
87 self.map = {}
87 self.map = {}
88 self.backupmap = {}
88 self.backupmap = {}
89 self.journal = journal
89 self.journal = journal
90 self._queue = []
90 self._queue = []
91 # a dict of arguments to be passed to hooks
91 # a dict of arguments to be passed to hooks
92 self.hookargs = {}
92 self.hookargs = {}
93
93
94 self.backupjournal = "%s.backupfiles" % journal
94 self.backupjournal = "%s.backupfiles" % journal
95 self.file = opener.open(self.journal, "w")
95 self.file = opener.open(self.journal, "w")
96 self.backupsfile = opener.open(self.backupjournal, 'w')
96 self.backupsfile = opener.open(self.backupjournal, 'w')
97 self.backupsfile.write('%d\n' % version)
97 self.backupsfile.write('%d\n' % version)
98 if createmode is not None:
98 if createmode is not None:
99 opener.chmod(self.journal, createmode & 0666)
99 opener.chmod(self.journal, createmode & 0666)
100 opener.chmod(self.backupjournal, createmode & 0666)
100 opener.chmod(self.backupjournal, createmode & 0666)
101
101
102 # hold file generations to be performed on commit
102 # hold file generations to be performed on commit
103 self._filegenerators = {}
103 self._filegenerators = {}
104 # hold callbalk to write pending data for hooks
104 # hold callbalk to write pending data for hooks
105 self._pendingcallback = {}
105 self._pendingcallback = {}
106 # True is any pending data have been written ever
106 # True is any pending data have been written ever
107 self._anypending = False
107 self._anypending = False
108 # holds callback to call when writing the transaction
109 self._finalizecallback = {}
108
110
109 def __del__(self):
111 def __del__(self):
110 if self.journal:
112 if self.journal:
111 self._abort()
113 self._abort()
112
114
113 @active
115 @active
114 def startgroup(self):
116 def startgroup(self):
115 self._queue.append(([], []))
117 self._queue.append(([], []))
116
118
117 @active
119 @active
118 def endgroup(self):
120 def endgroup(self):
119 q = self._queue.pop()
121 q = self._queue.pop()
120 self.entries.extend(q[0])
122 self.entries.extend(q[0])
121 self.backupentries.extend(q[1])
123 self.backupentries.extend(q[1])
122
124
123 offsets = []
125 offsets = []
124 backups = []
126 backups = []
125 for f, o, _data in q[0]:
127 for f, o, _data in q[0]:
126 offsets.append((f, o))
128 offsets.append((f, o))
127
129
128 for f, b, _data in q[1]:
130 for f, b, _data in q[1]:
129 backups.append((f, b))
131 backups.append((f, b))
130
132
131 d = ''.join(['%s\0%d\n' % (f, o) for f, o in offsets])
133 d = ''.join(['%s\0%d\n' % (f, o) for f, o in offsets])
132 self.file.write(d)
134 self.file.write(d)
133 self.file.flush()
135 self.file.flush()
134
136
135 d = ''.join(['%s\0%s\n' % (f, b) for f, b in backups])
137 d = ''.join(['%s\0%s\n' % (f, b) for f, b in backups])
136 self.backupsfile.write(d)
138 self.backupsfile.write(d)
137 self.backupsfile.flush()
139 self.backupsfile.flush()
138
140
139 @active
141 @active
140 def add(self, file, offset, data=None):
142 def add(self, file, offset, data=None):
141 if file in self.map or file in self.backupmap:
143 if file in self.map or file in self.backupmap:
142 return
144 return
143 if self._queue:
145 if self._queue:
144 self._queue[-1][0].append((file, offset, data))
146 self._queue[-1][0].append((file, offset, data))
145 return
147 return
146
148
147 self.entries.append((file, offset, data))
149 self.entries.append((file, offset, data))
148 self.map[file] = len(self.entries) - 1
150 self.map[file] = len(self.entries) - 1
149 # add enough data to the journal to do the truncate
151 # add enough data to the journal to do the truncate
150 self.file.write("%s\0%d\n" % (file, offset))
152 self.file.write("%s\0%d\n" % (file, offset))
151 self.file.flush()
153 self.file.flush()
152
154
153 @active
155 @active
154 def addbackup(self, file, hardlink=True, vfs=None):
156 def addbackup(self, file, hardlink=True, vfs=None):
155 """Adds a backup of the file to the transaction
157 """Adds a backup of the file to the transaction
156
158
157 Calling addbackup() creates a hardlink backup of the specified file
159 Calling addbackup() creates a hardlink backup of the specified file
158 that is used to recover the file in the event of the transaction
160 that is used to recover the file in the event of the transaction
159 aborting.
161 aborting.
160
162
161 * `file`: the file path, relative to .hg/store
163 * `file`: the file path, relative to .hg/store
162 * `hardlink`: use a hardlink to quickly create the backup
164 * `hardlink`: use a hardlink to quickly create the backup
163 """
165 """
164
166
165 if file in self.map or file in self.backupmap:
167 if file in self.map or file in self.backupmap:
166 return
168 return
167 backupfile = "%s.backup.%s" % (self.journal, file)
169 backupfile = "%s.backup.%s" % (self.journal, file)
168 if vfs is None:
170 if vfs is None:
169 vfs = self.opener
171 vfs = self.opener
170 if vfs.exists(file):
172 if vfs.exists(file):
171 filepath = vfs.join(file)
173 filepath = vfs.join(file)
172 backuppath = self.opener.join(backupfile)
174 backuppath = self.opener.join(backupfile)
173 util.copyfiles(filepath, backuppath, hardlink=hardlink)
175 util.copyfiles(filepath, backuppath, hardlink=hardlink)
174 else:
176 else:
175 self.add(file, 0)
177 self.add(file, 0)
176 return
178 return
177
179
178 if self._queue:
180 if self._queue:
179 self._queue[-1][1].append((file, backupfile))
181 self._queue[-1][1].append((file, backupfile))
180 return
182 return
181
183
182 self.backupentries.append((file, backupfile, None))
184 self.backupentries.append((file, backupfile, None))
183 self.backupmap[file] = len(self.backupentries) - 1
185 self.backupmap[file] = len(self.backupentries) - 1
184 self.backupsfile.write("%s\0%s\n" % (file, backupfile))
186 self.backupsfile.write("%s\0%s\n" % (file, backupfile))
185 self.backupsfile.flush()
187 self.backupsfile.flush()
186
188
187 @active
189 @active
188 def addfilegenerator(self, genid, filenames, genfunc, order=0, vfs=None):
190 def addfilegenerator(self, genid, filenames, genfunc, order=0, vfs=None):
189 """add a function to generates some files at transaction commit
191 """add a function to generates some files at transaction commit
190
192
191 The `genfunc` argument is a function capable of generating proper
193 The `genfunc` argument is a function capable of generating proper
192 content of each entry in the `filename` tuple.
194 content of each entry in the `filename` tuple.
193
195
194 At transaction close time, `genfunc` will be called with one file
196 At transaction close time, `genfunc` will be called with one file
195 object argument per entries in `filenames`.
197 object argument per entries in `filenames`.
196
198
197 The transaction itself is responsible for the backup, creation and
199 The transaction itself is responsible for the backup, creation and
198 final write of such file.
200 final write of such file.
199
201
200 The `genid` argument is used to ensure the same set of file is only
202 The `genid` argument is used to ensure the same set of file is only
201 generated once. Call to `addfilegenerator` for a `genid` already
203 generated once. Call to `addfilegenerator` for a `genid` already
202 present will overwrite the old entry.
204 present will overwrite the old entry.
203
205
204 The `order` argument may be used to control the order in which multiple
206 The `order` argument may be used to control the order in which multiple
205 generator will be executed.
207 generator will be executed.
206 """
208 """
207 # For now, we are unable to do proper backup and restore of custom vfs
209 # For now, we are unable to do proper backup and restore of custom vfs
208 # but for bookmarks that are handled outside this mechanism.
210 # but for bookmarks that are handled outside this mechanism.
209 assert vfs is None or filenames == ('bookmarks',)
211 assert vfs is None or filenames == ('bookmarks',)
210 self._filegenerators[genid] = (order, filenames, genfunc, vfs)
212 self._filegenerators[genid] = (order, filenames, genfunc, vfs)
211
213
212 def _generatefiles(self):
214 def _generatefiles(self):
213 # write files registered for generation
215 # write files registered for generation
214 for entry in sorted(self._filegenerators.values()):
216 for entry in sorted(self._filegenerators.values()):
215 order, filenames, genfunc, vfs = entry
217 order, filenames, genfunc, vfs = entry
216 if vfs is None:
218 if vfs is None:
217 vfs = self.opener
219 vfs = self.opener
218 files = []
220 files = []
219 try:
221 try:
220 for name in filenames:
222 for name in filenames:
221 # Some files are already backed up when creating the
223 # Some files are already backed up when creating the
222 # localrepo. Until this is properly fixed we disable the
224 # localrepo. Until this is properly fixed we disable the
223 # backup for them.
225 # backup for them.
224 if name not in ('phaseroots', 'bookmarks'):
226 if name not in ('phaseroots', 'bookmarks'):
225 self.addbackup(name)
227 self.addbackup(name)
226 files.append(vfs(name, 'w', atomictemp=True))
228 files.append(vfs(name, 'w', atomictemp=True))
227 genfunc(*files)
229 genfunc(*files)
228 finally:
230 finally:
229 for f in files:
231 for f in files:
230 f.close()
232 f.close()
231
233
232 @active
234 @active
233 def find(self, file):
235 def find(self, file):
234 if file in self.map:
236 if file in self.map:
235 return self.entries[self.map[file]]
237 return self.entries[self.map[file]]
236 if file in self.backupmap:
238 if file in self.backupmap:
237 return self.backupentries[self.backupmap[file]]
239 return self.backupentries[self.backupmap[file]]
238 return None
240 return None
239
241
240 @active
242 @active
241 def replace(self, file, offset, data=None):
243 def replace(self, file, offset, data=None):
242 '''
244 '''
243 replace can only replace already committed entries
245 replace can only replace already committed entries
244 that are not pending in the queue
246 that are not pending in the queue
245 '''
247 '''
246
248
247 if file not in self.map:
249 if file not in self.map:
248 raise KeyError(file)
250 raise KeyError(file)
249 index = self.map[file]
251 index = self.map[file]
250 self.entries[index] = (file, offset, data)
252 self.entries[index] = (file, offset, data)
251 self.file.write("%s\0%d\n" % (file, offset))
253 self.file.write("%s\0%d\n" % (file, offset))
252 self.file.flush()
254 self.file.flush()
253
255
254 @active
256 @active
255 def nest(self):
257 def nest(self):
256 self.count += 1
258 self.count += 1
257 self.usages += 1
259 self.usages += 1
258 return self
260 return self
259
261
260 def release(self):
262 def release(self):
261 if self.count > 0:
263 if self.count > 0:
262 self.usages -= 1
264 self.usages -= 1
263 # if the transaction scopes are left without being closed, fail
265 # if the transaction scopes are left without being closed, fail
264 if self.count > 0 and self.usages == 0:
266 if self.count > 0 and self.usages == 0:
265 self._abort()
267 self._abort()
266
268
267 def running(self):
269 def running(self):
268 return self.count > 0
270 return self.count > 0
269
271
270 def addpending(self, category, callback):
272 def addpending(self, category, callback):
271 """add a callback to be called when the transaction is pending
273 """add a callback to be called when the transaction is pending
272
274
273 Category is a unique identifier to allow overwriting an old callback
275 Category is a unique identifier to allow overwriting an old callback
274 with a newer callback.
276 with a newer callback.
275 """
277 """
276 self._pendingcallback[category] = callback
278 self._pendingcallback[category] = callback
277
279
278 @active
280 @active
279 def writepending(self):
281 def writepending(self):
280 '''write pending file to temporary version
282 '''write pending file to temporary version
281
283
282 This is used to allow hooks to view a transaction before commit'''
284 This is used to allow hooks to view a transaction before commit'''
283 categories = sorted(self._pendingcallback)
285 categories = sorted(self._pendingcallback)
284 for cat in categories:
286 for cat in categories:
285 # remove callback since the data will have been flushed
287 # remove callback since the data will have been flushed
286 any = self._pendingcallback.pop(cat)()
288 any = self._pendingcallback.pop(cat)()
287 self._anypending = self._anypending or any
289 self._anypending = self._anypending or any
288 return self._anypending
290 return self._anypending
289
291
290 @active
292 @active
293 def addfinalize(self, category, callback):
294 """add a callback to be called when the transaction is closed
295
296 Category is a unique identifier to allow overwriting old callbacks with
297 newer callbacks.
298 """
299 self._finalizecallback[category] = callback
300
301 @active
291 def close(self):
302 def close(self):
292 '''commit the transaction'''
303 '''commit the transaction'''
293 if self.count == 1 and self.onclose is not None:
304 if self.count == 1 and self.onclose is not None:
294 self._generatefiles()
305 self._generatefiles()
306 categories = sorted(self._finalizecallback)
307 for cat in categories:
308 self._finalizecallback[cat]()
295 self.onclose()
309 self.onclose()
296
310
297 self.count -= 1
311 self.count -= 1
298 if self.count != 0:
312 if self.count != 0:
299 return
313 return
300 self.file.close()
314 self.file.close()
301 self.backupsfile.close()
315 self.backupsfile.close()
302 self.entries = []
316 self.entries = []
303 if self.after:
317 if self.after:
304 self.after()
318 self.after()
305 if self.opener.isfile(self.journal):
319 if self.opener.isfile(self.journal):
306 self.opener.unlink(self.journal)
320 self.opener.unlink(self.journal)
307 if self.opener.isfile(self.backupjournal):
321 if self.opener.isfile(self.backupjournal):
308 self.opener.unlink(self.backupjournal)
322 self.opener.unlink(self.backupjournal)
309 for _f, b, _ignore in self.backupentries:
323 for _f, b, _ignore in self.backupentries:
310 self.opener.unlink(b)
324 self.opener.unlink(b)
311 self.backupentries = []
325 self.backupentries = []
312 self.journal = None
326 self.journal = None
313
327
314 @active
328 @active
315 def abort(self):
329 def abort(self):
316 '''abort the transaction (generally called on error, or when the
330 '''abort the transaction (generally called on error, or when the
317 transaction is not explicitly committed before going out of
331 transaction is not explicitly committed before going out of
318 scope)'''
332 scope)'''
319 self._abort()
333 self._abort()
320
334
321 def _abort(self):
335 def _abort(self):
322 self.count = 0
336 self.count = 0
323 self.usages = 0
337 self.usages = 0
324 self.file.close()
338 self.file.close()
325 self.backupsfile.close()
339 self.backupsfile.close()
326
340
327 if self.onabort is not None:
341 if self.onabort is not None:
328 self.onabort()
342 self.onabort()
329
343
330 try:
344 try:
331 if not self.entries and not self.backupentries:
345 if not self.entries and not self.backupentries:
332 if self.journal:
346 if self.journal:
333 self.opener.unlink(self.journal)
347 self.opener.unlink(self.journal)
334 if self.backupjournal:
348 if self.backupjournal:
335 self.opener.unlink(self.backupjournal)
349 self.opener.unlink(self.backupjournal)
336 return
350 return
337
351
338 self.report(_("transaction abort!\n"))
352 self.report(_("transaction abort!\n"))
339
353
340 try:
354 try:
341 _playback(self.journal, self.report, self.opener,
355 _playback(self.journal, self.report, self.opener,
342 self.entries, self.backupentries, False)
356 self.entries, self.backupentries, False)
343 self.report(_("rollback completed\n"))
357 self.report(_("rollback completed\n"))
344 except Exception:
358 except Exception:
345 self.report(_("rollback failed - please run hg recover\n"))
359 self.report(_("rollback failed - please run hg recover\n"))
346 finally:
360 finally:
347 self.journal = None
361 self.journal = None
348
362
349
363
350 def rollback(opener, file, report):
364 def rollback(opener, file, report):
351 """Rolls back the transaction contained in the given file
365 """Rolls back the transaction contained in the given file
352
366
353 Reads the entries in the specified file, and the corresponding
367 Reads the entries in the specified file, and the corresponding
354 '*.backupfiles' file, to recover from an incomplete transaction.
368 '*.backupfiles' file, to recover from an incomplete transaction.
355
369
356 * `file`: a file containing a list of entries, specifying where
370 * `file`: a file containing a list of entries, specifying where
357 to truncate each file. The file should contain a list of
371 to truncate each file. The file should contain a list of
358 file\0offset pairs, delimited by newlines. The corresponding
372 file\0offset pairs, delimited by newlines. The corresponding
359 '*.backupfiles' file should contain a list of file\0backupfile
373 '*.backupfiles' file should contain a list of file\0backupfile
360 pairs, delimited by \0.
374 pairs, delimited by \0.
361 """
375 """
362 entries = []
376 entries = []
363 backupentries = []
377 backupentries = []
364
378
365 fp = opener.open(file)
379 fp = opener.open(file)
366 lines = fp.readlines()
380 lines = fp.readlines()
367 fp.close()
381 fp.close()
368 for l in lines:
382 for l in lines:
369 try:
383 try:
370 f, o = l.split('\0')
384 f, o = l.split('\0')
371 entries.append((f, int(o), None))
385 entries.append((f, int(o), None))
372 except ValueError:
386 except ValueError:
373 report(_("couldn't read journal entry %r!\n") % l)
387 report(_("couldn't read journal entry %r!\n") % l)
374
388
375 backupjournal = "%s.backupfiles" % file
389 backupjournal = "%s.backupfiles" % file
376 if opener.exists(backupjournal):
390 if opener.exists(backupjournal):
377 fp = opener.open(backupjournal)
391 fp = opener.open(backupjournal)
378 lines = fp.readlines()
392 lines = fp.readlines()
379 if lines:
393 if lines:
380 ver = lines[0][:-1]
394 ver = lines[0][:-1]
381 if ver == str(version):
395 if ver == str(version):
382 for line in lines[1:]:
396 for line in lines[1:]:
383 if line:
397 if line:
384 # Shave off the trailing newline
398 # Shave off the trailing newline
385 line = line[:-1]
399 line = line[:-1]
386 f, b = line.split('\0')
400 f, b = line.split('\0')
387 backupentries.append((f, b, None))
401 backupentries.append((f, b, None))
388 else:
402 else:
389 report(_("journal was created by a newer version of "
403 report(_("journal was created by a newer version of "
390 "Mercurial"))
404 "Mercurial"))
391
405
392 _playback(file, report, opener, entries, backupentries)
406 _playback(file, report, opener, entries, backupentries)
General Comments 0
You need to be logged in to leave comments. Login now