##// END OF EJS Templates
transaction: allow registering a finalization callback...
Pierre-Yves David -
r23204:10beda5b default
parent child Browse files
Show More
@@ -1,392 +1,406 b''
1 1 # transaction.py - simple journaling scheme for mercurial
2 2 #
3 3 # This transaction scheme is intended to gracefully handle program
4 4 # errors and interruptions. More serious failures like system crashes
5 5 # can be recovered with an fsck-like tool. As the whole repository is
6 6 # effectively log-structured, this should amount to simply truncating
7 7 # anything that isn't referenced in the changelog.
8 8 #
9 9 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
10 10 #
11 11 # This software may be used and distributed according to the terms of the
12 12 # GNU General Public License version 2 or any later version.
13 13
14 14 from i18n import _
15 15 import errno
16 16 import error, util
17 17
18 18 version = 1
19 19
20 20 def active(func):
21 21 def _active(self, *args, **kwds):
22 22 if self.count == 0:
23 23 raise error.Abort(_(
24 24 'cannot use transaction when it is already committed/aborted'))
25 25 return func(self, *args, **kwds)
26 26 return _active
27 27
28 28 def _playback(journal, report, opener, entries, backupentries, unlink=True):
29 29 for f, o, _ignore in entries:
30 30 if o or not unlink:
31 31 try:
32 32 fp = opener(f, 'a')
33 33 fp.truncate(o)
34 34 fp.close()
35 35 except IOError:
36 36 report(_("failed to truncate %s\n") % f)
37 37 raise
38 38 else:
39 39 try:
40 40 opener.unlink(f)
41 41 except (IOError, OSError), inst:
42 42 if inst.errno != errno.ENOENT:
43 43 raise
44 44
45 45 backupfiles = []
46 46 for f, b, _ignore in backupentries:
47 47 filepath = opener.join(f)
48 48 backuppath = opener.join(b)
49 49 try:
50 50 util.copyfile(backuppath, filepath)
51 51 backupfiles.append(b)
52 52 except IOError:
53 53 report(_("failed to recover %s\n") % f)
54 54 raise
55 55
56 56 opener.unlink(journal)
57 57 backuppath = "%s.backupfiles" % journal
58 58 if opener.exists(backuppath):
59 59 opener.unlink(backuppath)
60 60 for f in backupfiles:
61 61 opener.unlink(f)
62 62
63 63 class transaction(object):
64 64 def __init__(self, report, opener, journal, after=None, createmode=None,
65 65 onclose=None, onabort=None):
66 66 """Begin a new transaction
67 67
68 68 Begins a new transaction that allows rolling back writes in the event of
69 69 an exception.
70 70
71 71 * `after`: called after the transaction has been committed
72 72 * `createmode`: the mode of the journal file that will be created
73 73 * `onclose`: called as the transaction is closing, but before it is
74 74 closed
75 75 * `onabort`: called as the transaction is aborting, but before any files
76 76 have been truncated
77 77 """
78 78 self.count = 1
79 79 self.usages = 1
80 80 self.report = report
81 81 self.opener = opener
82 82 self.after = after
83 83 self.onclose = onclose
84 84 self.onabort = onabort
85 85 self.entries = []
86 86 self.backupentries = []
87 87 self.map = {}
88 88 self.backupmap = {}
89 89 self.journal = journal
90 90 self._queue = []
91 91 # a dict of arguments to be passed to hooks
92 92 self.hookargs = {}
93 93
94 94 self.backupjournal = "%s.backupfiles" % journal
95 95 self.file = opener.open(self.journal, "w")
96 96 self.backupsfile = opener.open(self.backupjournal, 'w')
97 97 self.backupsfile.write('%d\n' % version)
98 98 if createmode is not None:
99 99 opener.chmod(self.journal, createmode & 0666)
100 100 opener.chmod(self.backupjournal, createmode & 0666)
101 101
102 102 # hold file generations to be performed on commit
103 103 self._filegenerators = {}
104 104 # hold callbalk to write pending data for hooks
105 105 self._pendingcallback = {}
106 106 # True is any pending data have been written ever
107 107 self._anypending = False
108 # holds callback to call when writing the transaction
109 self._finalizecallback = {}
108 110
109 111 def __del__(self):
110 112 if self.journal:
111 113 self._abort()
112 114
113 115 @active
114 116 def startgroup(self):
115 117 self._queue.append(([], []))
116 118
117 119 @active
118 120 def endgroup(self):
119 121 q = self._queue.pop()
120 122 self.entries.extend(q[0])
121 123 self.backupentries.extend(q[1])
122 124
123 125 offsets = []
124 126 backups = []
125 127 for f, o, _data in q[0]:
126 128 offsets.append((f, o))
127 129
128 130 for f, b, _data in q[1]:
129 131 backups.append((f, b))
130 132
131 133 d = ''.join(['%s\0%d\n' % (f, o) for f, o in offsets])
132 134 self.file.write(d)
133 135 self.file.flush()
134 136
135 137 d = ''.join(['%s\0%s\n' % (f, b) for f, b in backups])
136 138 self.backupsfile.write(d)
137 139 self.backupsfile.flush()
138 140
139 141 @active
140 142 def add(self, file, offset, data=None):
141 143 if file in self.map or file in self.backupmap:
142 144 return
143 145 if self._queue:
144 146 self._queue[-1][0].append((file, offset, data))
145 147 return
146 148
147 149 self.entries.append((file, offset, data))
148 150 self.map[file] = len(self.entries) - 1
149 151 # add enough data to the journal to do the truncate
150 152 self.file.write("%s\0%d\n" % (file, offset))
151 153 self.file.flush()
152 154
153 155 @active
154 156 def addbackup(self, file, hardlink=True, vfs=None):
155 157 """Adds a backup of the file to the transaction
156 158
157 159 Calling addbackup() creates a hardlink backup of the specified file
158 160 that is used to recover the file in the event of the transaction
159 161 aborting.
160 162
161 163 * `file`: the file path, relative to .hg/store
162 164 * `hardlink`: use a hardlink to quickly create the backup
163 165 """
164 166
165 167 if file in self.map or file in self.backupmap:
166 168 return
167 169 backupfile = "%s.backup.%s" % (self.journal, file)
168 170 if vfs is None:
169 171 vfs = self.opener
170 172 if vfs.exists(file):
171 173 filepath = vfs.join(file)
172 174 backuppath = self.opener.join(backupfile)
173 175 util.copyfiles(filepath, backuppath, hardlink=hardlink)
174 176 else:
175 177 self.add(file, 0)
176 178 return
177 179
178 180 if self._queue:
179 181 self._queue[-1][1].append((file, backupfile))
180 182 return
181 183
182 184 self.backupentries.append((file, backupfile, None))
183 185 self.backupmap[file] = len(self.backupentries) - 1
184 186 self.backupsfile.write("%s\0%s\n" % (file, backupfile))
185 187 self.backupsfile.flush()
186 188
187 189 @active
188 190 def addfilegenerator(self, genid, filenames, genfunc, order=0, vfs=None):
189 191 """add a function to generates some files at transaction commit
190 192
191 193 The `genfunc` argument is a function capable of generating proper
192 194 content of each entry in the `filename` tuple.
193 195
194 196 At transaction close time, `genfunc` will be called with one file
195 197 object argument per entries in `filenames`.
196 198
197 199 The transaction itself is responsible for the backup, creation and
198 200 final write of such file.
199 201
200 202 The `genid` argument is used to ensure the same set of file is only
201 203 generated once. Call to `addfilegenerator` for a `genid` already
202 204 present will overwrite the old entry.
203 205
204 206 The `order` argument may be used to control the order in which multiple
205 207 generator will be executed.
206 208 """
207 209 # For now, we are unable to do proper backup and restore of custom vfs
208 210 # but for bookmarks that are handled outside this mechanism.
209 211 assert vfs is None or filenames == ('bookmarks',)
210 212 self._filegenerators[genid] = (order, filenames, genfunc, vfs)
211 213
212 214 def _generatefiles(self):
213 215 # write files registered for generation
214 216 for entry in sorted(self._filegenerators.values()):
215 217 order, filenames, genfunc, vfs = entry
216 218 if vfs is None:
217 219 vfs = self.opener
218 220 files = []
219 221 try:
220 222 for name in filenames:
221 223 # Some files are already backed up when creating the
222 224 # localrepo. Until this is properly fixed we disable the
223 225 # backup for them.
224 226 if name not in ('phaseroots', 'bookmarks'):
225 227 self.addbackup(name)
226 228 files.append(vfs(name, 'w', atomictemp=True))
227 229 genfunc(*files)
228 230 finally:
229 231 for f in files:
230 232 f.close()
231 233
232 234 @active
233 235 def find(self, file):
234 236 if file in self.map:
235 237 return self.entries[self.map[file]]
236 238 if file in self.backupmap:
237 239 return self.backupentries[self.backupmap[file]]
238 240 return None
239 241
240 242 @active
241 243 def replace(self, file, offset, data=None):
242 244 '''
243 245 replace can only replace already committed entries
244 246 that are not pending in the queue
245 247 '''
246 248
247 249 if file not in self.map:
248 250 raise KeyError(file)
249 251 index = self.map[file]
250 252 self.entries[index] = (file, offset, data)
251 253 self.file.write("%s\0%d\n" % (file, offset))
252 254 self.file.flush()
253 255
254 256 @active
255 257 def nest(self):
256 258 self.count += 1
257 259 self.usages += 1
258 260 return self
259 261
260 262 def release(self):
261 263 if self.count > 0:
262 264 self.usages -= 1
263 265 # if the transaction scopes are left without being closed, fail
264 266 if self.count > 0 and self.usages == 0:
265 267 self._abort()
266 268
267 269 def running(self):
268 270 return self.count > 0
269 271
270 272 def addpending(self, category, callback):
271 273 """add a callback to be called when the transaction is pending
272 274
273 275 Category is a unique identifier to allow overwriting an old callback
274 276 with a newer callback.
275 277 """
276 278 self._pendingcallback[category] = callback
277 279
278 280 @active
279 281 def writepending(self):
280 282 '''write pending file to temporary version
281 283
282 284 This is used to allow hooks to view a transaction before commit'''
283 285 categories = sorted(self._pendingcallback)
284 286 for cat in categories:
285 287 # remove callback since the data will have been flushed
286 288 any = self._pendingcallback.pop(cat)()
287 289 self._anypending = self._anypending or any
288 290 return self._anypending
289 291
290 292 @active
293 def addfinalize(self, category, callback):
294 """add a callback to be called when the transaction is closed
295
296 Category is a unique identifier to allow overwriting old callbacks with
297 newer callbacks.
298 """
299 self._finalizecallback[category] = callback
300
301 @active
291 302 def close(self):
292 303 '''commit the transaction'''
293 304 if self.count == 1 and self.onclose is not None:
294 305 self._generatefiles()
306 categories = sorted(self._finalizecallback)
307 for cat in categories:
308 self._finalizecallback[cat]()
295 309 self.onclose()
296 310
297 311 self.count -= 1
298 312 if self.count != 0:
299 313 return
300 314 self.file.close()
301 315 self.backupsfile.close()
302 316 self.entries = []
303 317 if self.after:
304 318 self.after()
305 319 if self.opener.isfile(self.journal):
306 320 self.opener.unlink(self.journal)
307 321 if self.opener.isfile(self.backupjournal):
308 322 self.opener.unlink(self.backupjournal)
309 323 for _f, b, _ignore in self.backupentries:
310 324 self.opener.unlink(b)
311 325 self.backupentries = []
312 326 self.journal = None
313 327
314 328 @active
315 329 def abort(self):
316 330 '''abort the transaction (generally called on error, or when the
317 331 transaction is not explicitly committed before going out of
318 332 scope)'''
319 333 self._abort()
320 334
321 335 def _abort(self):
322 336 self.count = 0
323 337 self.usages = 0
324 338 self.file.close()
325 339 self.backupsfile.close()
326 340
327 341 if self.onabort is not None:
328 342 self.onabort()
329 343
330 344 try:
331 345 if not self.entries and not self.backupentries:
332 346 if self.journal:
333 347 self.opener.unlink(self.journal)
334 348 if self.backupjournal:
335 349 self.opener.unlink(self.backupjournal)
336 350 return
337 351
338 352 self.report(_("transaction abort!\n"))
339 353
340 354 try:
341 355 _playback(self.journal, self.report, self.opener,
342 356 self.entries, self.backupentries, False)
343 357 self.report(_("rollback completed\n"))
344 358 except Exception:
345 359 self.report(_("rollback failed - please run hg recover\n"))
346 360 finally:
347 361 self.journal = None
348 362
349 363
350 364 def rollback(opener, file, report):
351 365 """Rolls back the transaction contained in the given file
352 366
353 367 Reads the entries in the specified file, and the corresponding
354 368 '*.backupfiles' file, to recover from an incomplete transaction.
355 369
356 370 * `file`: a file containing a list of entries, specifying where
357 371 to truncate each file. The file should contain a list of
358 372 file\0offset pairs, delimited by newlines. The corresponding
359 373 '*.backupfiles' file should contain a list of file\0backupfile
360 374 pairs, delimited by \0.
361 375 """
362 376 entries = []
363 377 backupentries = []
364 378
365 379 fp = opener.open(file)
366 380 lines = fp.readlines()
367 381 fp.close()
368 382 for l in lines:
369 383 try:
370 384 f, o = l.split('\0')
371 385 entries.append((f, int(o), None))
372 386 except ValueError:
373 387 report(_("couldn't read journal entry %r!\n") % l)
374 388
375 389 backupjournal = "%s.backupfiles" % file
376 390 if opener.exists(backupjournal):
377 391 fp = opener.open(backupjournal)
378 392 lines = fp.readlines()
379 393 if lines:
380 394 ver = lines[0][:-1]
381 395 if ver == str(version):
382 396 for line in lines[1:]:
383 397 if line:
384 398 # Shave off the trailing newline
385 399 line = line[:-1]
386 400 f, b = line.split('\0')
387 401 backupentries.append((f, b, None))
388 402 else:
389 403 report(_("journal was created by a newer version of "
390 404 "Mercurial"))
391 405
392 406 _playback(file, report, opener, entries, backupentries)
General Comments 0
You need to be logged in to leave comments. Login now