##// END OF EJS Templates
transaction: turn a transaction into a Python context manager...
Bryan O'Sullivan -
r27862:b2145c19 default
parent child Browse files
Show More
@@ -1,562 +1,570 b''
1 1 # transaction.py - simple journaling scheme for mercurial
2 2 #
3 3 # This transaction scheme is intended to gracefully handle program
4 4 # errors and interruptions. More serious failures like system crashes
5 5 # can be recovered with an fsck-like tool. As the whole repository is
6 6 # effectively log-structured, this should amount to simply truncating
7 7 # anything that isn't referenced in the changelog.
8 8 #
9 9 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
10 10 #
11 11 # This software may be used and distributed according to the terms of the
12 12 # GNU General Public License version 2 or any later version.
13 13
14 14 from __future__ import absolute_import
15 15
16 16 import errno
17 17
18 18 from .i18n import _
19 19 from . import (
20 20 error,
21 21 util,
22 22 )
23 23
24 24 version = 2
25 25
26 26 def active(func):
27 27 def _active(self, *args, **kwds):
28 28 if self.count == 0:
29 29 raise error.Abort(_(
30 30 'cannot use transaction when it is already committed/aborted'))
31 31 return func(self, *args, **kwds)
32 32 return _active
33 33
34 34 def _playback(journal, report, opener, vfsmap, entries, backupentries,
35 35 unlink=True):
36 36 for f, o, _ignore in entries:
37 37 if o or not unlink:
38 38 try:
39 39 fp = opener(f, 'a')
40 40 fp.truncate(o)
41 41 fp.close()
42 42 except IOError:
43 43 report(_("failed to truncate %s\n") % f)
44 44 raise
45 45 else:
46 46 try:
47 47 opener.unlink(f)
48 48 except (IOError, OSError) as inst:
49 49 if inst.errno != errno.ENOENT:
50 50 raise
51 51
52 52 backupfiles = []
53 53 for l, f, b, c in backupentries:
54 54 if l not in vfsmap and c:
55 55 report("couldn't handle %s: unknown cache location %s\n"
56 56 % (b, l))
57 57 vfs = vfsmap[l]
58 58 try:
59 59 if f and b:
60 60 filepath = vfs.join(f)
61 61 backuppath = vfs.join(b)
62 62 try:
63 63 util.copyfile(backuppath, filepath)
64 64 backupfiles.append(b)
65 65 except IOError:
66 66 report(_("failed to recover %s\n") % f)
67 67 else:
68 68 target = f or b
69 69 try:
70 70 vfs.unlink(target)
71 71 except (IOError, OSError) as inst:
72 72 if inst.errno != errno.ENOENT:
73 73 raise
74 74 except (IOError, OSError, error.Abort) as inst:
75 75 if not c:
76 76 raise
77 77
78 78 backuppath = "%s.backupfiles" % journal
79 79 if opener.exists(backuppath):
80 80 opener.unlink(backuppath)
81 81 opener.unlink(journal)
82 82 try:
83 83 for f in backupfiles:
84 84 if opener.exists(f):
85 85 opener.unlink(f)
86 86 except (IOError, OSError, error.Abort) as inst:
87 87 # only pure backup file remains, it is sage to ignore any error
88 88 pass
89 89
90 90 class transaction(object):
91 91 def __init__(self, report, opener, vfsmap, journalname, undoname=None,
92 92 after=None, createmode=None, validator=None, releasefn=None):
93 93 """Begin a new transaction
94 94
95 95 Begins a new transaction that allows rolling back writes in the event of
96 96 an exception.
97 97
98 98 * `after`: called after the transaction has been committed
99 99 * `createmode`: the mode of the journal file that will be created
100 100 * `releasefn`: called after releasing (with transaction and result)
101 101 """
102 102 self.count = 1
103 103 self.usages = 1
104 104 self.report = report
105 105 # a vfs to the store content
106 106 self.opener = opener
107 107 # a map to access file in various {location -> vfs}
108 108 vfsmap = vfsmap.copy()
109 109 vfsmap[''] = opener # set default value
110 110 self._vfsmap = vfsmap
111 111 self.after = after
112 112 self.entries = []
113 113 self.map = {}
114 114 self.journal = journalname
115 115 self.undoname = undoname
116 116 self._queue = []
117 117 # A callback to validate transaction content before closing it.
118 118 # should raise exception is anything is wrong.
119 119 # target user is repository hooks.
120 120 if validator is None:
121 121 validator = lambda tr: None
122 122 self.validator = validator
123 123 # A callback to do something just after releasing transaction.
124 124 if releasefn is None:
125 125 releasefn = lambda tr, success: None
126 126 self.releasefn = releasefn
127 127
128 128 # a dict of arguments to be passed to hooks
129 129 self.hookargs = {}
130 130 self.file = opener.open(self.journal, "w")
131 131
132 132 # a list of ('location', 'path', 'backuppath', cache) entries.
133 133 # - if 'backuppath' is empty, no file existed at backup time
134 134 # - if 'path' is empty, this is a temporary transaction file
135 135 # - if 'location' is not empty, the path is outside main opener reach.
136 136 # use 'location' value as a key in a vfsmap to find the right 'vfs'
137 137 # (cache is currently unused)
138 138 self._backupentries = []
139 139 self._backupmap = {}
140 140 self._backupjournal = "%s.backupfiles" % self.journal
141 141 self._backupsfile = opener.open(self._backupjournal, 'w')
142 142 self._backupsfile.write('%d\n' % version)
143 143
144 144 if createmode is not None:
145 145 opener.chmod(self.journal, createmode & 0o666)
146 146 opener.chmod(self._backupjournal, createmode & 0o666)
147 147
148 148 # hold file generations to be performed on commit
149 149 self._filegenerators = {}
150 150 # hold callback to write pending data for hooks
151 151 self._pendingcallback = {}
152 152 # True is any pending data have been written ever
153 153 self._anypending = False
154 154 # holds callback to call when writing the transaction
155 155 self._finalizecallback = {}
156 156 # hold callback for post transaction close
157 157 self._postclosecallback = {}
158 158 # holds callbacks to call during abort
159 159 self._abortcallback = {}
160 160
161 161 def __del__(self):
162 162 if self.journal:
163 163 self._abort()
164 164
165 165 @active
166 166 def startgroup(self):
167 167 """delay registration of file entry
168 168
169 169 This is used by strip to delay vision of strip offset. The transaction
170 170 sees either none or all of the strip actions to be done."""
171 171 self._queue.append([])
172 172
173 173 @active
174 174 def endgroup(self):
175 175 """apply delayed registration of file entry.
176 176
177 177 This is used by strip to delay vision of strip offset. The transaction
178 178 sees either none or all of the strip actions to be done."""
179 179 q = self._queue.pop()
180 180 for f, o, data in q:
181 181 self._addentry(f, o, data)
182 182
183 183 @active
184 184 def add(self, file, offset, data=None):
185 185 """record the state of an append-only file before update"""
186 186 if file in self.map or file in self._backupmap:
187 187 return
188 188 if self._queue:
189 189 self._queue[-1].append((file, offset, data))
190 190 return
191 191
192 192 self._addentry(file, offset, data)
193 193
194 194 def _addentry(self, file, offset, data):
195 195 """add a append-only entry to memory and on-disk state"""
196 196 if file in self.map or file in self._backupmap:
197 197 return
198 198 self.entries.append((file, offset, data))
199 199 self.map[file] = len(self.entries) - 1
200 200 # add enough data to the journal to do the truncate
201 201 self.file.write("%s\0%d\n" % (file, offset))
202 202 self.file.flush()
203 203
204 204 @active
205 205 def addbackup(self, file, hardlink=True, location=''):
206 206 """Adds a backup of the file to the transaction
207 207
208 208 Calling addbackup() creates a hardlink backup of the specified file
209 209 that is used to recover the file in the event of the transaction
210 210 aborting.
211 211
212 212 * `file`: the file path, relative to .hg/store
213 213 * `hardlink`: use a hardlink to quickly create the backup
214 214 """
215 215 if self._queue:
216 216 msg = 'cannot use transaction.addbackup inside "group"'
217 217 raise RuntimeError(msg)
218 218
219 219 if file in self.map or file in self._backupmap:
220 220 return
221 221 vfs = self._vfsmap[location]
222 222 dirname, filename = vfs.split(file)
223 223 backupfilename = "%s.backup.%s" % (self.journal, filename)
224 224 backupfile = vfs.reljoin(dirname, backupfilename)
225 225 if vfs.exists(file):
226 226 filepath = vfs.join(file)
227 227 backuppath = vfs.join(backupfile)
228 228 util.copyfile(filepath, backuppath, hardlink=hardlink)
229 229 else:
230 230 backupfile = ''
231 231
232 232 self._addbackupentry((location, file, backupfile, False))
233 233
234 234 def _addbackupentry(self, entry):
235 235 """register a new backup entry and write it to disk"""
236 236 self._backupentries.append(entry)
237 237 self._backupmap[entry[1]] = len(self._backupentries) - 1
238 238 self._backupsfile.write("%s\0%s\0%s\0%d\n" % entry)
239 239 self._backupsfile.flush()
240 240
241 241 @active
242 242 def registertmp(self, tmpfile, location=''):
243 243 """register a temporary transaction file
244 244
245 245 Such files will be deleted when the transaction exits (on both
246 246 failure and success).
247 247 """
248 248 self._addbackupentry((location, '', tmpfile, False))
249 249
250 250 @active
251 251 def addfilegenerator(self, genid, filenames, genfunc, order=0,
252 252 location=''):
253 253 """add a function to generates some files at transaction commit
254 254
255 255 The `genfunc` argument is a function capable of generating proper
256 256 content of each entry in the `filename` tuple.
257 257
258 258 At transaction close time, `genfunc` will be called with one file
259 259 object argument per entries in `filenames`.
260 260
261 261 The transaction itself is responsible for the backup, creation and
262 262 final write of such file.
263 263
264 264 The `genid` argument is used to ensure the same set of file is only
265 265 generated once. Call to `addfilegenerator` for a `genid` already
266 266 present will overwrite the old entry.
267 267
268 268 The `order` argument may be used to control the order in which multiple
269 269 generator will be executed.
270 270
271 271 The `location` arguments may be used to indicate the files are located
272 272 outside of the the standard directory for transaction. It should match
273 273 one of the key of the `transaction.vfsmap` dictionary.
274 274 """
275 275 # For now, we are unable to do proper backup and restore of custom vfs
276 276 # but for bookmarks that are handled outside this mechanism.
277 277 self._filegenerators[genid] = (order, filenames, genfunc, location)
278 278
279 279 def _generatefiles(self, suffix=''):
280 280 # write files registered for generation
281 281 any = False
282 282 for entry in sorted(self._filegenerators.values()):
283 283 any = True
284 284 order, filenames, genfunc, location = entry
285 285 vfs = self._vfsmap[location]
286 286 files = []
287 287 try:
288 288 for name in filenames:
289 289 name += suffix
290 290 if suffix:
291 291 self.registertmp(name, location=location)
292 292 else:
293 293 self.addbackup(name, location=location)
294 294 files.append(vfs(name, 'w', atomictemp=True))
295 295 genfunc(*files)
296 296 finally:
297 297 for f in files:
298 298 f.close()
299 299 return any
300 300
301 301 @active
302 302 def find(self, file):
303 303 if file in self.map:
304 304 return self.entries[self.map[file]]
305 305 if file in self._backupmap:
306 306 return self._backupentries[self._backupmap[file]]
307 307 return None
308 308
309 309 @active
310 310 def replace(self, file, offset, data=None):
311 311 '''
312 312 replace can only replace already committed entries
313 313 that are not pending in the queue
314 314 '''
315 315
316 316 if file not in self.map:
317 317 raise KeyError(file)
318 318 index = self.map[file]
319 319 self.entries[index] = (file, offset, data)
320 320 self.file.write("%s\0%d\n" % (file, offset))
321 321 self.file.flush()
322 322
323 323 @active
324 324 def nest(self):
325 325 self.count += 1
326 326 self.usages += 1
327 327 return self
328 328
329 329 def release(self):
330 330 if self.count > 0:
331 331 self.usages -= 1
332 332 # if the transaction scopes are left without being closed, fail
333 333 if self.count > 0 and self.usages == 0:
334 334 self._abort()
335 335
336 def __enter__(self):
337 return self
338
339 def __exit__(self, exc_type, exc_val, exc_tb):
340 if exc_type is None:
341 self.close()
342 self.release()
343
336 344 def running(self):
337 345 return self.count > 0
338 346
339 347 def addpending(self, category, callback):
340 348 """add a callback to be called when the transaction is pending
341 349
342 350 The transaction will be given as callback's first argument.
343 351
344 352 Category is a unique identifier to allow overwriting an old callback
345 353 with a newer callback.
346 354 """
347 355 self._pendingcallback[category] = callback
348 356
349 357 @active
350 358 def writepending(self):
351 359 '''write pending file to temporary version
352 360
353 361 This is used to allow hooks to view a transaction before commit'''
354 362 categories = sorted(self._pendingcallback)
355 363 for cat in categories:
356 364 # remove callback since the data will have been flushed
357 365 any = self._pendingcallback.pop(cat)(self)
358 366 self._anypending = self._anypending or any
359 367 self._anypending |= self._generatefiles(suffix='.pending')
360 368 return self._anypending
361 369
362 370 @active
363 371 def addfinalize(self, category, callback):
364 372 """add a callback to be called when the transaction is closed
365 373
366 374 The transaction will be given as callback's first argument.
367 375
368 376 Category is a unique identifier to allow overwriting old callbacks with
369 377 newer callbacks.
370 378 """
371 379 self._finalizecallback[category] = callback
372 380
373 381 @active
374 382 def addpostclose(self, category, callback):
375 383 """add a callback to be called after the transaction is closed
376 384
377 385 The transaction will be given as callback's first argument.
378 386
379 387 Category is a unique identifier to allow overwriting an old callback
380 388 with a newer callback.
381 389 """
382 390 self._postclosecallback[category] = callback
383 391
384 392 @active
385 393 def addabort(self, category, callback):
386 394 """add a callback to be called when the transaction is aborted.
387 395
388 396 The transaction will be given as the first argument to the callback.
389 397
390 398 Category is a unique identifier to allow overwriting an old callback
391 399 with a newer callback.
392 400 """
393 401 self._abortcallback[category] = callback
394 402
395 403 @active
396 404 def close(self):
397 405 '''commit the transaction'''
398 406 if self.count == 1:
399 407 self.validator(self) # will raise exception if needed
400 408 self._generatefiles()
401 409 categories = sorted(self._finalizecallback)
402 410 for cat in categories:
403 411 self._finalizecallback[cat](self)
404 412
405 413 self.count -= 1
406 414 if self.count != 0:
407 415 return
408 416 self.file.close()
409 417 self._backupsfile.close()
410 418 # cleanup temporary files
411 419 for l, f, b, c in self._backupentries:
412 420 if l not in self._vfsmap and c:
413 421 self.report("couldn't remove %s: unknown cache location %s\n"
414 422 % (b, l))
415 423 continue
416 424 vfs = self._vfsmap[l]
417 425 if not f and b and vfs.exists(b):
418 426 try:
419 427 vfs.unlink(b)
420 428 except (IOError, OSError, error.Abort) as inst:
421 429 if not c:
422 430 raise
423 431 # Abort may be raise by read only opener
424 432 self.report("couldn't remove %s: %s\n"
425 433 % (vfs.join(b), inst))
426 434 self.entries = []
427 435 self._writeundo()
428 436 if self.after:
429 437 self.after()
430 438 if self.opener.isfile(self._backupjournal):
431 439 self.opener.unlink(self._backupjournal)
432 440 if self.opener.isfile(self.journal):
433 441 self.opener.unlink(self.journal)
434 442 for l, _f, b, c in self._backupentries:
435 443 if l not in self._vfsmap and c:
436 444 self.report("couldn't remove %s: unknown cache location"
437 445 "%s\n" % (b, l))
438 446 continue
439 447 vfs = self._vfsmap[l]
440 448 if b and vfs.exists(b):
441 449 try:
442 450 vfs.unlink(b)
443 451 except (IOError, OSError, error.Abort) as inst:
444 452 if not c:
445 453 raise
446 454 # Abort may be raise by read only opener
447 455 self.report("couldn't remove %s: %s\n"
448 456 % (vfs.join(b), inst))
449 457 self._backupentries = []
450 458 self.journal = None
451 459
452 460 self.releasefn(self, True) # notify success of closing transaction
453 461
454 462 # run post close action
455 463 categories = sorted(self._postclosecallback)
456 464 for cat in categories:
457 465 self._postclosecallback[cat](self)
458 466
459 467 @active
460 468 def abort(self):
461 469 '''abort the transaction (generally called on error, or when the
462 470 transaction is not explicitly committed before going out of
463 471 scope)'''
464 472 self._abort()
465 473
466 474 def _writeundo(self):
467 475 """write transaction data for possible future undo call"""
468 476 if self.undoname is None:
469 477 return
470 478 undobackupfile = self.opener.open("%s.backupfiles" % self.undoname, 'w')
471 479 undobackupfile.write('%d\n' % version)
472 480 for l, f, b, c in self._backupentries:
473 481 if not f: # temporary file
474 482 continue
475 483 if not b:
476 484 u = ''
477 485 else:
478 486 if l not in self._vfsmap and c:
479 487 self.report("couldn't remove %s: unknown cache location"
480 488 "%s\n" % (b, l))
481 489 continue
482 490 vfs = self._vfsmap[l]
483 491 base, name = vfs.split(b)
484 492 assert name.startswith(self.journal), name
485 493 uname = name.replace(self.journal, self.undoname, 1)
486 494 u = vfs.reljoin(base, uname)
487 495 util.copyfile(vfs.join(b), vfs.join(u), hardlink=True)
488 496 undobackupfile.write("%s\0%s\0%s\0%d\n" % (l, f, u, c))
489 497 undobackupfile.close()
490 498
491 499
492 500 def _abort(self):
493 501 self.count = 0
494 502 self.usages = 0
495 503 self.file.close()
496 504 self._backupsfile.close()
497 505
498 506 try:
499 507 if not self.entries and not self._backupentries:
500 508 if self._backupjournal:
501 509 self.opener.unlink(self._backupjournal)
502 510 if self.journal:
503 511 self.opener.unlink(self.journal)
504 512 return
505 513
506 514 self.report(_("transaction abort!\n"))
507 515
508 516 try:
509 517 for cat in sorted(self._abortcallback):
510 518 self._abortcallback[cat](self)
511 519 _playback(self.journal, self.report, self.opener, self._vfsmap,
512 520 self.entries, self._backupentries, False)
513 521 self.report(_("rollback completed\n"))
514 522 except BaseException:
515 523 self.report(_("rollback failed - please run hg recover\n"))
516 524 finally:
517 525 self.journal = None
518 526 self.releasefn(self, False) # notify failure of transaction
519 527
520 528 def rollback(opener, vfsmap, file, report):
521 529 """Rolls back the transaction contained in the given file
522 530
523 531 Reads the entries in the specified file, and the corresponding
524 532 '*.backupfiles' file, to recover from an incomplete transaction.
525 533
526 534 * `file`: a file containing a list of entries, specifying where
527 535 to truncate each file. The file should contain a list of
528 536 file\0offset pairs, delimited by newlines. The corresponding
529 537 '*.backupfiles' file should contain a list of file\0backupfile
530 538 pairs, delimited by \0.
531 539 """
532 540 entries = []
533 541 backupentries = []
534 542
535 543 fp = opener.open(file)
536 544 lines = fp.readlines()
537 545 fp.close()
538 546 for l in lines:
539 547 try:
540 548 f, o = l.split('\0')
541 549 entries.append((f, int(o), None))
542 550 except ValueError:
543 551 report(_("couldn't read journal entry %r!\n") % l)
544 552
545 553 backupjournal = "%s.backupfiles" % file
546 554 if opener.exists(backupjournal):
547 555 fp = opener.open(backupjournal)
548 556 lines = fp.readlines()
549 557 if lines:
550 558 ver = lines[0][:-1]
551 559 if ver == str(version):
552 560 for line in lines[1:]:
553 561 if line:
554 562 # Shave off the trailing newline
555 563 line = line[:-1]
556 564 l, f, b, c = line.split('\0')
557 565 backupentries.append((l, f, b, bool(c)))
558 566 else:
559 567 report(_("journal was created by a different version of "
560 568 "Mercurial\n"))
561 569
562 570 _playback(file, report, opener, vfsmap, entries, backupentries)
General Comments 0
You need to be logged in to leave comments. Login now