##// END OF EJS Templates
transaction: do not overwrite atomic-temp files on error...
Yuya Nishihara -
r41140:3e2c0283 default
parent child Browse files
Show More
@@ -1,645 +1,649 b''
1 1 # transaction.py - simple journaling scheme for mercurial
2 2 #
3 3 # This transaction scheme is intended to gracefully handle program
4 4 # errors and interruptions. More serious failures like system crashes
5 5 # can be recovered with an fsck-like tool. As the whole repository is
6 6 # effectively log-structured, this should amount to simply truncating
7 7 # anything that isn't referenced in the changelog.
8 8 #
9 9 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
10 10 #
11 11 # This software may be used and distributed according to the terms of the
12 12 # GNU General Public License version 2 or any later version.
13 13
14 14 from __future__ import absolute_import
15 15
16 16 import errno
17 17
18 18 from .i18n import _
19 19 from . import (
20 20 error,
21 21 pycompat,
22 22 util,
23 23 )
24 24 from .utils import (
25 25 stringutil,
26 26 )
27 27
28 28 version = 2
29 29
30 30 # These are the file generators that should only be executed after the
31 31 # finalizers are done, since they rely on the output of the finalizers (like
32 32 # the changelog having been written).
33 33 postfinalizegenerators = {
34 34 'bookmarks',
35 35 'dirstate'
36 36 }
37 37
38 38 gengroupall='all'
39 39 gengroupprefinalize='prefinalize'
40 40 gengrouppostfinalize='postfinalize'
41 41
42 42 def active(func):
43 43 def _active(self, *args, **kwds):
44 44 if self._count == 0:
45 45 raise error.Abort(_(
46 46 'cannot use transaction when it is already committed/aborted'))
47 47 return func(self, *args, **kwds)
48 48 return _active
49 49
50 50 def _playback(journal, report, opener, vfsmap, entries, backupentries,
51 51 unlink=True, checkambigfiles=None):
52 52 for f, o, _ignore in entries:
53 53 if o or not unlink:
54 54 checkambig = checkambigfiles and (f, '') in checkambigfiles
55 55 try:
56 56 fp = opener(f, 'a', checkambig=checkambig)
57 57 fp.truncate(o)
58 58 fp.close()
59 59 except IOError:
60 60 report(_("failed to truncate %s\n") % f)
61 61 raise
62 62 else:
63 63 try:
64 64 opener.unlink(f)
65 65 except (IOError, OSError) as inst:
66 66 if inst.errno != errno.ENOENT:
67 67 raise
68 68
69 69 backupfiles = []
70 70 for l, f, b, c in backupentries:
71 71 if l not in vfsmap and c:
72 72 report("couldn't handle %s: unknown cache location %s\n"
73 73 % (b, l))
74 74 vfs = vfsmap[l]
75 75 try:
76 76 if f and b:
77 77 filepath = vfs.join(f)
78 78 backuppath = vfs.join(b)
79 79 checkambig = checkambigfiles and (f, l) in checkambigfiles
80 80 try:
81 81 util.copyfile(backuppath, filepath, checkambig=checkambig)
82 82 backupfiles.append(b)
83 83 except IOError:
84 84 report(_("failed to recover %s\n") % f)
85 85 else:
86 86 target = f or b
87 87 try:
88 88 vfs.unlink(target)
89 89 except (IOError, OSError) as inst:
90 90 if inst.errno != errno.ENOENT:
91 91 raise
92 92 except (IOError, OSError, error.Abort) as inst:
93 93 if not c:
94 94 raise
95 95
96 96 backuppath = "%s.backupfiles" % journal
97 97 if opener.exists(backuppath):
98 98 opener.unlink(backuppath)
99 99 opener.unlink(journal)
100 100 try:
101 101 for f in backupfiles:
102 102 if opener.exists(f):
103 103 opener.unlink(f)
104 104 except (IOError, OSError, error.Abort) as inst:
105 105 # only pure backup file remains, it is sage to ignore any error
106 106 pass
107 107
108 108 class transaction(util.transactional):
109 109 def __init__(self, report, opener, vfsmap, journalname, undoname=None,
110 110 after=None, createmode=None, validator=None, releasefn=None,
111 111 checkambigfiles=None, name=r'<unnamed>'):
112 112 """Begin a new transaction
113 113
114 114 Begins a new transaction that allows rolling back writes in the event of
115 115 an exception.
116 116
117 117 * `after`: called after the transaction has been committed
118 118 * `createmode`: the mode of the journal file that will be created
119 119 * `releasefn`: called after releasing (with transaction and result)
120 120
121 121 `checkambigfiles` is a set of (path, vfs-location) tuples,
122 122 which determine whether file stat ambiguity should be avoided
123 123 for corresponded files.
124 124 """
125 125 self._count = 1
126 126 self._usages = 1
127 127 self._report = report
128 128 # a vfs to the store content
129 129 self._opener = opener
130 130 # a map to access file in various {location -> vfs}
131 131 vfsmap = vfsmap.copy()
132 132 vfsmap[''] = opener # set default value
133 133 self._vfsmap = vfsmap
134 134 self._after = after
135 135 self._entries = []
136 136 self._map = {}
137 137 self._journal = journalname
138 138 self._undoname = undoname
139 139 self._queue = []
140 140 # A callback to validate transaction content before closing it.
141 141 # should raise exception is anything is wrong.
142 142 # target user is repository hooks.
143 143 if validator is None:
144 144 validator = lambda tr: None
145 145 self._validator = validator
146 146 # A callback to do something just after releasing transaction.
147 147 if releasefn is None:
148 148 releasefn = lambda tr, success: None
149 149 self._releasefn = releasefn
150 150
151 151 self._checkambigfiles = set()
152 152 if checkambigfiles:
153 153 self._checkambigfiles.update(checkambigfiles)
154 154
155 155 self._names = [name]
156 156
157 157 # A dict dedicated to precisely tracking the changes introduced in the
158 158 # transaction.
159 159 self.changes = {}
160 160
161 161 # a dict of arguments to be passed to hooks
162 162 self.hookargs = {}
163 163 self._file = opener.open(self._journal, "w")
164 164
165 165 # a list of ('location', 'path', 'backuppath', cache) entries.
166 166 # - if 'backuppath' is empty, no file existed at backup time
167 167 # - if 'path' is empty, this is a temporary transaction file
168 168 # - if 'location' is not empty, the path is outside main opener reach.
169 169 # use 'location' value as a key in a vfsmap to find the right 'vfs'
170 170 # (cache is currently unused)
171 171 self._backupentries = []
172 172 self._backupmap = {}
173 173 self._backupjournal = "%s.backupfiles" % self._journal
174 174 self._backupsfile = opener.open(self._backupjournal, 'w')
175 175 self._backupsfile.write('%d\n' % version)
176 176
177 177 if createmode is not None:
178 178 opener.chmod(self._journal, createmode & 0o666)
179 179 opener.chmod(self._backupjournal, createmode & 0o666)
180 180
181 181 # hold file generations to be performed on commit
182 182 self._filegenerators = {}
183 183 # hold callback to write pending data for hooks
184 184 self._pendingcallback = {}
185 185 # True is any pending data have been written ever
186 186 self._anypending = False
187 187 # holds callback to call when writing the transaction
188 188 self._finalizecallback = {}
189 189 # hold callback for post transaction close
190 190 self._postclosecallback = {}
191 191 # holds callbacks to call during abort
192 192 self._abortcallback = {}
193 193
194 194 def __repr__(self):
195 195 name = r'/'.join(self._names)
196 196 return (r'<transaction name=%s, count=%d, usages=%d>' %
197 197 (name, self._count, self._usages))
198 198
199 199 def __del__(self):
200 200 if self._journal:
201 201 self._abort()
202 202
203 203 @active
204 204 def startgroup(self):
205 205 """delay registration of file entry
206 206
207 207 This is used by strip to delay vision of strip offset. The transaction
208 208 sees either none or all of the strip actions to be done."""
209 209 self._queue.append([])
210 210
211 211 @active
212 212 def endgroup(self):
213 213 """apply delayed registration of file entry.
214 214
215 215 This is used by strip to delay vision of strip offset. The transaction
216 216 sees either none or all of the strip actions to be done."""
217 217 q = self._queue.pop()
218 218 for f, o, data in q:
219 219 self._addentry(f, o, data)
220 220
221 221 @active
222 222 def add(self, file, offset, data=None):
223 223 """record the state of an append-only file before update"""
224 224 if file in self._map or file in self._backupmap:
225 225 return
226 226 if self._queue:
227 227 self._queue[-1].append((file, offset, data))
228 228 return
229 229
230 230 self._addentry(file, offset, data)
231 231
232 232 def _addentry(self, file, offset, data):
233 233 """add a append-only entry to memory and on-disk state"""
234 234 if file in self._map or file in self._backupmap:
235 235 return
236 236 self._entries.append((file, offset, data))
237 237 self._map[file] = len(self._entries) - 1
238 238 # add enough data to the journal to do the truncate
239 239 self._file.write("%s\0%d\n" % (file, offset))
240 240 self._file.flush()
241 241
242 242 @active
243 243 def addbackup(self, file, hardlink=True, location=''):
244 244 """Adds a backup of the file to the transaction
245 245
246 246 Calling addbackup() creates a hardlink backup of the specified file
247 247 that is used to recover the file in the event of the transaction
248 248 aborting.
249 249
250 250 * `file`: the file path, relative to .hg/store
251 251 * `hardlink`: use a hardlink to quickly create the backup
252 252 """
253 253 if self._queue:
254 254 msg = 'cannot use transaction.addbackup inside "group"'
255 255 raise error.ProgrammingError(msg)
256 256
257 257 if file in self._map or file in self._backupmap:
258 258 return
259 259 vfs = self._vfsmap[location]
260 260 dirname, filename = vfs.split(file)
261 261 backupfilename = "%s.backup.%s" % (self._journal, filename)
262 262 backupfile = vfs.reljoin(dirname, backupfilename)
263 263 if vfs.exists(file):
264 264 filepath = vfs.join(file)
265 265 backuppath = vfs.join(backupfile)
266 266 util.copyfile(filepath, backuppath, hardlink=hardlink)
267 267 else:
268 268 backupfile = ''
269 269
270 270 self._addbackupentry((location, file, backupfile, False))
271 271
272 272 def _addbackupentry(self, entry):
273 273 """register a new backup entry and write it to disk"""
274 274 self._backupentries.append(entry)
275 275 self._backupmap[entry[1]] = len(self._backupentries) - 1
276 276 self._backupsfile.write("%s\0%s\0%s\0%d\n" % entry)
277 277 self._backupsfile.flush()
278 278
279 279 @active
280 280 def registertmp(self, tmpfile, location=''):
281 281 """register a temporary transaction file
282 282
283 283 Such files will be deleted when the transaction exits (on both
284 284 failure and success).
285 285 """
286 286 self._addbackupentry((location, '', tmpfile, False))
287 287
288 288 @active
289 289 def addfilegenerator(self, genid, filenames, genfunc, order=0,
290 290 location=''):
291 291 """add a function to generates some files at transaction commit
292 292
293 293 The `genfunc` argument is a function capable of generating proper
294 294 content of each entry in the `filename` tuple.
295 295
296 296 At transaction close time, `genfunc` will be called with one file
297 297 object argument per entries in `filenames`.
298 298
299 299 The transaction itself is responsible for the backup, creation and
300 300 final write of such file.
301 301
302 302 The `genid` argument is used to ensure the same set of file is only
303 303 generated once. Call to `addfilegenerator` for a `genid` already
304 304 present will overwrite the old entry.
305 305
306 306 The `order` argument may be used to control the order in which multiple
307 307 generator will be executed.
308 308
309 309 The `location` arguments may be used to indicate the files are located
310 310 outside of the the standard directory for transaction. It should match
311 311 one of the key of the `transaction.vfsmap` dictionary.
312 312 """
313 313 # For now, we are unable to do proper backup and restore of custom vfs
314 314 # but for bookmarks that are handled outside this mechanism.
315 315 self._filegenerators[genid] = (order, filenames, genfunc, location)
316 316
317 317 @active
318 318 def removefilegenerator(self, genid):
319 319 """reverse of addfilegenerator, remove a file generator function"""
320 320 if genid in self._filegenerators:
321 321 del self._filegenerators[genid]
322 322
323 323 def _generatefiles(self, suffix='', group=gengroupall):
324 324 # write files registered for generation
325 325 any = False
326 326 for id, entry in sorted(self._filegenerators.iteritems()):
327 327 any = True
328 328 order, filenames, genfunc, location = entry
329 329
330 330 # for generation at closing, check if it's before or after finalize
331 331 postfinalize = group == gengrouppostfinalize
332 332 if (group != gengroupall and
333 333 (id in postfinalizegenerators) != (postfinalize)):
334 334 continue
335 335
336 336 vfs = self._vfsmap[location]
337 337 files = []
338 338 try:
339 339 for name in filenames:
340 340 name += suffix
341 341 if suffix:
342 342 self.registertmp(name, location=location)
343 343 checkambig = False
344 344 else:
345 345 self.addbackup(name, location=location)
346 346 checkambig = (name, location) in self._checkambigfiles
347 347 files.append(vfs(name, 'w', atomictemp=True,
348 348 checkambig=checkambig))
349 349 genfunc(*files)
350 for f in files:
351 f.close()
352 # skip discard() loop since we're sure no open file remains
353 del files[:]
350 354 finally:
351 355 for f in files:
352 f.close()
356 f.discard()
353 357 return any
354 358
355 359 @active
356 360 def find(self, file):
357 361 if file in self._map:
358 362 return self._entries[self._map[file]]
359 363 if file in self._backupmap:
360 364 return self._backupentries[self._backupmap[file]]
361 365 return None
362 366
363 367 @active
364 368 def replace(self, file, offset, data=None):
365 369 '''
366 370 replace can only replace already committed entries
367 371 that are not pending in the queue
368 372 '''
369 373
370 374 if file not in self._map:
371 375 raise KeyError(file)
372 376 index = self._map[file]
373 377 self._entries[index] = (file, offset, data)
374 378 self._file.write("%s\0%d\n" % (file, offset))
375 379 self._file.flush()
376 380
377 381 @active
378 382 def nest(self, name=r'<unnamed>'):
379 383 self._count += 1
380 384 self._usages += 1
381 385 self._names.append(name)
382 386 return self
383 387
384 388 def release(self):
385 389 if self._count > 0:
386 390 self._usages -= 1
387 391 if self._names:
388 392 self._names.pop()
389 393 # if the transaction scopes are left without being closed, fail
390 394 if self._count > 0 and self._usages == 0:
391 395 self._abort()
392 396
393 397 def running(self):
394 398 return self._count > 0
395 399
396 400 def addpending(self, category, callback):
397 401 """add a callback to be called when the transaction is pending
398 402
399 403 The transaction will be given as callback's first argument.
400 404
401 405 Category is a unique identifier to allow overwriting an old callback
402 406 with a newer callback.
403 407 """
404 408 self._pendingcallback[category] = callback
405 409
406 410 @active
407 411 def writepending(self):
408 412 '''write pending file to temporary version
409 413
410 414 This is used to allow hooks to view a transaction before commit'''
411 415 categories = sorted(self._pendingcallback)
412 416 for cat in categories:
413 417 # remove callback since the data will have been flushed
414 418 any = self._pendingcallback.pop(cat)(self)
415 419 self._anypending = self._anypending or any
416 420 self._anypending |= self._generatefiles(suffix='.pending')
417 421 return self._anypending
418 422
419 423 @active
420 424 def addfinalize(self, category, callback):
421 425 """add a callback to be called when the transaction is closed
422 426
423 427 The transaction will be given as callback's first argument.
424 428
425 429 Category is a unique identifier to allow overwriting old callbacks with
426 430 newer callbacks.
427 431 """
428 432 self._finalizecallback[category] = callback
429 433
430 434 @active
431 435 def addpostclose(self, category, callback):
432 436 """add or replace a callback to be called after the transaction closed
433 437
434 438 The transaction will be given as callback's first argument.
435 439
436 440 Category is a unique identifier to allow overwriting an old callback
437 441 with a newer callback.
438 442 """
439 443 self._postclosecallback[category] = callback
440 444
441 445 @active
442 446 def getpostclose(self, category):
443 447 """return a postclose callback added before, or None"""
444 448 return self._postclosecallback.get(category, None)
445 449
446 450 @active
447 451 def addabort(self, category, callback):
448 452 """add a callback to be called when the transaction is aborted.
449 453
450 454 The transaction will be given as the first argument to the callback.
451 455
452 456 Category is a unique identifier to allow overwriting an old callback
453 457 with a newer callback.
454 458 """
455 459 self._abortcallback[category] = callback
456 460
457 461 @active
458 462 def close(self):
459 463 '''commit the transaction'''
460 464 if self._count == 1:
461 465 self._validator(self) # will raise exception if needed
462 466 self._validator = None # Help prevent cycles.
463 467 self._generatefiles(group=gengroupprefinalize)
464 468 categories = sorted(self._finalizecallback)
465 469 for cat in categories:
466 470 self._finalizecallback[cat](self)
467 471 # Prevent double usage and help clear cycles.
468 472 self._finalizecallback = None
469 473 self._generatefiles(group=gengrouppostfinalize)
470 474
471 475 self._count -= 1
472 476 if self._count != 0:
473 477 return
474 478 self._file.close()
475 479 self._backupsfile.close()
476 480 # cleanup temporary files
477 481 for l, f, b, c in self._backupentries:
478 482 if l not in self._vfsmap and c:
479 483 self._report("couldn't remove %s: unknown cache location %s\n"
480 484 % (b, l))
481 485 continue
482 486 vfs = self._vfsmap[l]
483 487 if not f and b and vfs.exists(b):
484 488 try:
485 489 vfs.unlink(b)
486 490 except (IOError, OSError, error.Abort) as inst:
487 491 if not c:
488 492 raise
489 493 # Abort may be raise by read only opener
490 494 self._report("couldn't remove %s: %s\n"
491 495 % (vfs.join(b), inst))
492 496 self._entries = []
493 497 self._writeundo()
494 498 if self._after:
495 499 self._after()
496 500 self._after = None # Help prevent cycles.
497 501 if self._opener.isfile(self._backupjournal):
498 502 self._opener.unlink(self._backupjournal)
499 503 if self._opener.isfile(self._journal):
500 504 self._opener.unlink(self._journal)
501 505 for l, _f, b, c in self._backupentries:
502 506 if l not in self._vfsmap and c:
503 507 self._report("couldn't remove %s: unknown cache location"
504 508 "%s\n" % (b, l))
505 509 continue
506 510 vfs = self._vfsmap[l]
507 511 if b and vfs.exists(b):
508 512 try:
509 513 vfs.unlink(b)
510 514 except (IOError, OSError, error.Abort) as inst:
511 515 if not c:
512 516 raise
513 517 # Abort may be raise by read only opener
514 518 self._report("couldn't remove %s: %s\n"
515 519 % (vfs.join(b), inst))
516 520 self._backupentries = []
517 521 self._journal = None
518 522
519 523 self._releasefn(self, True) # notify success of closing transaction
520 524 self._releasefn = None # Help prevent cycles.
521 525
522 526 # run post close action
523 527 categories = sorted(self._postclosecallback)
524 528 for cat in categories:
525 529 self._postclosecallback[cat](self)
526 530 # Prevent double usage and help clear cycles.
527 531 self._postclosecallback = None
528 532
529 533 @active
530 534 def abort(self):
531 535 '''abort the transaction (generally called on error, or when the
532 536 transaction is not explicitly committed before going out of
533 537 scope)'''
534 538 self._abort()
535 539
536 540 def _writeundo(self):
537 541 """write transaction data for possible future undo call"""
538 542 if self._undoname is None:
539 543 return
540 544 undobackupfile = self._opener.open("%s.backupfiles" % self._undoname,
541 545 'w')
542 546 undobackupfile.write('%d\n' % version)
543 547 for l, f, b, c in self._backupentries:
544 548 if not f: # temporary file
545 549 continue
546 550 if not b:
547 551 u = ''
548 552 else:
549 553 if l not in self._vfsmap and c:
550 554 self._report("couldn't remove %s: unknown cache location"
551 555 "%s\n" % (b, l))
552 556 continue
553 557 vfs = self._vfsmap[l]
554 558 base, name = vfs.split(b)
555 559 assert name.startswith(self._journal), name
556 560 uname = name.replace(self._journal, self._undoname, 1)
557 561 u = vfs.reljoin(base, uname)
558 562 util.copyfile(vfs.join(b), vfs.join(u), hardlink=True)
559 563 undobackupfile.write("%s\0%s\0%s\0%d\n" % (l, f, u, c))
560 564 undobackupfile.close()
561 565
562 566
563 567 def _abort(self):
564 568 self._count = 0
565 569 self._usages = 0
566 570 self._file.close()
567 571 self._backupsfile.close()
568 572
569 573 try:
570 574 if not self._entries and not self._backupentries:
571 575 if self._backupjournal:
572 576 self._opener.unlink(self._backupjournal)
573 577 if self._journal:
574 578 self._opener.unlink(self._journal)
575 579 return
576 580
577 581 self._report(_("transaction abort!\n"))
578 582
579 583 try:
580 584 for cat in sorted(self._abortcallback):
581 585 self._abortcallback[cat](self)
582 586 # Prevent double usage and help clear cycles.
583 587 self._abortcallback = None
584 588 _playback(self._journal, self._report, self._opener,
585 589 self._vfsmap, self._entries, self._backupentries,
586 590 False, checkambigfiles=self._checkambigfiles)
587 591 self._report(_("rollback completed\n"))
588 592 except BaseException as exc:
589 593 self._report(_("rollback failed - please run hg recover\n"))
590 594 self._report(_("(failure reason: %s)\n")
591 595 % stringutil.forcebytestr(exc))
592 596 finally:
593 597 self._journal = None
594 598 self._releasefn(self, False) # notify failure of transaction
595 599 self._releasefn = None # Help prevent cycles.
596 600
597 601 def rollback(opener, vfsmap, file, report, checkambigfiles=None):
598 602 """Rolls back the transaction contained in the given file
599 603
600 604 Reads the entries in the specified file, and the corresponding
601 605 '*.backupfiles' file, to recover from an incomplete transaction.
602 606
603 607 * `file`: a file containing a list of entries, specifying where
604 608 to truncate each file. The file should contain a list of
605 609 file\0offset pairs, delimited by newlines. The corresponding
606 610 '*.backupfiles' file should contain a list of file\0backupfile
607 611 pairs, delimited by \0.
608 612
609 613 `checkambigfiles` is a set of (path, vfs-location) tuples,
610 614 which determine whether file stat ambiguity should be avoided at
611 615 restoring corresponded files.
612 616 """
613 617 entries = []
614 618 backupentries = []
615 619
616 620 fp = opener.open(file)
617 621 lines = fp.readlines()
618 622 fp.close()
619 623 for l in lines:
620 624 try:
621 625 f, o = l.split('\0')
622 626 entries.append((f, int(o), None))
623 627 except ValueError:
624 628 report(
625 629 _("couldn't read journal entry %r!\n") % pycompat.bytestr(l))
626 630
627 631 backupjournal = "%s.backupfiles" % file
628 632 if opener.exists(backupjournal):
629 633 fp = opener.open(backupjournal)
630 634 lines = fp.readlines()
631 635 if lines:
632 636 ver = lines[0][:-1]
633 637 if ver == (b'%d' % version):
634 638 for line in lines[1:]:
635 639 if line:
636 640 # Shave off the trailing newline
637 641 line = line[:-1]
638 642 l, f, b, c = line.split('\0')
639 643 backupentries.append((l, f, b, bool(c)))
640 644 else:
641 645 report(_("journal was created by a different version of "
642 646 "Mercurial\n"))
643 647
644 648 _playback(file, report, opener, vfsmap, entries, backupentries,
645 649 checkambigfiles=checkambigfiles)
General Comments 0
You need to be logged in to leave comments. Login now