##// END OF EJS Templates
transaction: detect an attempt to truncate-to-extend on playback, raise error...
Kyle Lippincott -
r43233:8502f76d default
parent child Browse files
Show More
@@ -1,649 +1,653 b''
1 1 # transaction.py - simple journaling scheme for mercurial
2 2 #
3 3 # This transaction scheme is intended to gracefully handle program
4 4 # errors and interruptions. More serious failures like system crashes
5 5 # can be recovered with an fsck-like tool. As the whole repository is
6 6 # effectively log-structured, this should amount to simply truncating
7 7 # anything that isn't referenced in the changelog.
8 8 #
9 9 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
10 10 #
11 11 # This software may be used and distributed according to the terms of the
12 12 # GNU General Public License version 2 or any later version.
13 13
14 14 from __future__ import absolute_import
15 15
16 16 import errno
17 17
18 18 from .i18n import _
19 19 from . import (
20 20 error,
21 21 pycompat,
22 22 util,
23 23 )
24 24 from .utils import (
25 25 stringutil,
26 26 )
27 27
28 28 version = 2
29 29
30 30 # These are the file generators that should only be executed after the
31 31 # finalizers are done, since they rely on the output of the finalizers (like
32 32 # the changelog having been written).
33 33 postfinalizegenerators = {
34 34 'bookmarks',
35 35 'dirstate'
36 36 }
37 37
38 38 gengroupall='all'
39 39 gengroupprefinalize='prefinalize'
40 40 gengrouppostfinalize='postfinalize'
41 41
42 42 def active(func):
43 43 def _active(self, *args, **kwds):
44 44 if self._count == 0:
45 45 raise error.Abort(_(
46 46 'cannot use transaction when it is already committed/aborted'))
47 47 return func(self, *args, **kwds)
48 48 return _active
49 49
50 50 def _playback(journal, report, opener, vfsmap, entries, backupentries,
51 51 unlink=True, checkambigfiles=None):
52 52 for f, o, _ignore in entries:
53 53 if o or not unlink:
54 54 checkambig = checkambigfiles and (f, '') in checkambigfiles
55 55 try:
56 56 fp = opener(f, 'a', checkambig=checkambig)
57 if fp.tell() < o:
58 raise error.Abort(_(
59 "attempted to truncate %s to %d bytes, but it was "
60 "already %d bytes\n") % (f, o, fp.tell()))
57 61 fp.truncate(o)
58 62 fp.close()
59 63 except IOError:
60 64 report(_("failed to truncate %s\n") % f)
61 65 raise
62 66 else:
63 67 try:
64 68 opener.unlink(f)
65 69 except (IOError, OSError) as inst:
66 70 if inst.errno != errno.ENOENT:
67 71 raise
68 72
69 73 backupfiles = []
70 74 for l, f, b, c in backupentries:
71 75 if l not in vfsmap and c:
72 76 report("couldn't handle %s: unknown cache location %s\n"
73 77 % (b, l))
74 78 vfs = vfsmap[l]
75 79 try:
76 80 if f and b:
77 81 filepath = vfs.join(f)
78 82 backuppath = vfs.join(b)
79 83 checkambig = checkambigfiles and (f, l) in checkambigfiles
80 84 try:
81 85 util.copyfile(backuppath, filepath, checkambig=checkambig)
82 86 backupfiles.append(b)
83 87 except IOError:
84 88 report(_("failed to recover %s\n") % f)
85 89 else:
86 90 target = f or b
87 91 try:
88 92 vfs.unlink(target)
89 93 except (IOError, OSError) as inst:
90 94 if inst.errno != errno.ENOENT:
91 95 raise
92 96 except (IOError, OSError, error.Abort):
93 97 if not c:
94 98 raise
95 99
96 100 backuppath = "%s.backupfiles" % journal
97 101 if opener.exists(backuppath):
98 102 opener.unlink(backuppath)
99 103 opener.unlink(journal)
100 104 try:
101 105 for f in backupfiles:
102 106 if opener.exists(f):
103 107 opener.unlink(f)
104 108 except (IOError, OSError, error.Abort):
105 109 # only pure backup file remains, it is sage to ignore any error
106 110 pass
107 111
108 112 class transaction(util.transactional):
109 113 def __init__(self, report, opener, vfsmap, journalname, undoname=None,
110 114 after=None, createmode=None, validator=None, releasefn=None,
111 115 checkambigfiles=None, name=r'<unnamed>'):
112 116 """Begin a new transaction
113 117
114 118 Begins a new transaction that allows rolling back writes in the event of
115 119 an exception.
116 120
117 121 * `after`: called after the transaction has been committed
118 122 * `createmode`: the mode of the journal file that will be created
119 123 * `releasefn`: called after releasing (with transaction and result)
120 124
121 125 `checkambigfiles` is a set of (path, vfs-location) tuples,
122 126 which determine whether file stat ambiguity should be avoided
123 127 for corresponded files.
124 128 """
125 129 self._count = 1
126 130 self._usages = 1
127 131 self._report = report
128 132 # a vfs to the store content
129 133 self._opener = opener
130 134 # a map to access file in various {location -> vfs}
131 135 vfsmap = vfsmap.copy()
132 136 vfsmap[''] = opener # set default value
133 137 self._vfsmap = vfsmap
134 138 self._after = after
135 139 self._entries = []
136 140 self._map = {}
137 141 self._journal = journalname
138 142 self._undoname = undoname
139 143 self._queue = []
140 144 # A callback to validate transaction content before closing it.
141 145 # should raise exception is anything is wrong.
142 146 # target user is repository hooks.
143 147 if validator is None:
144 148 validator = lambda tr: None
145 149 self._validator = validator
146 150 # A callback to do something just after releasing transaction.
147 151 if releasefn is None:
148 152 releasefn = lambda tr, success: None
149 153 self._releasefn = releasefn
150 154
151 155 self._checkambigfiles = set()
152 156 if checkambigfiles:
153 157 self._checkambigfiles.update(checkambigfiles)
154 158
155 159 self._names = [name]
156 160
157 161 # A dict dedicated to precisely tracking the changes introduced in the
158 162 # transaction.
159 163 self.changes = {}
160 164
161 165 # a dict of arguments to be passed to hooks
162 166 self.hookargs = {}
163 167 self._file = opener.open(self._journal, "w")
164 168
165 169 # a list of ('location', 'path', 'backuppath', cache) entries.
166 170 # - if 'backuppath' is empty, no file existed at backup time
167 171 # - if 'path' is empty, this is a temporary transaction file
168 172 # - if 'location' is not empty, the path is outside main opener reach.
169 173 # use 'location' value as a key in a vfsmap to find the right 'vfs'
170 174 # (cache is currently unused)
171 175 self._backupentries = []
172 176 self._backupmap = {}
173 177 self._backupjournal = "%s.backupfiles" % self._journal
174 178 self._backupsfile = opener.open(self._backupjournal, 'w')
175 179 self._backupsfile.write('%d\n' % version)
176 180
177 181 if createmode is not None:
178 182 opener.chmod(self._journal, createmode & 0o666)
179 183 opener.chmod(self._backupjournal, createmode & 0o666)
180 184
181 185 # hold file generations to be performed on commit
182 186 self._filegenerators = {}
183 187 # hold callback to write pending data for hooks
184 188 self._pendingcallback = {}
185 189 # True is any pending data have been written ever
186 190 self._anypending = False
187 191 # holds callback to call when writing the transaction
188 192 self._finalizecallback = {}
189 193 # hold callback for post transaction close
190 194 self._postclosecallback = {}
191 195 # holds callbacks to call during abort
192 196 self._abortcallback = {}
193 197
194 198 def __repr__(self):
195 199 name = r'/'.join(self._names)
196 200 return (r'<transaction name=%s, count=%d, usages=%d>' %
197 201 (name, self._count, self._usages))
198 202
199 203 def __del__(self):
200 204 if self._journal:
201 205 self._abort()
202 206
203 207 @active
204 208 def startgroup(self):
205 209 """delay registration of file entry
206 210
207 211 This is used by strip to delay vision of strip offset. The transaction
208 212 sees either none or all of the strip actions to be done."""
209 213 self._queue.append([])
210 214
211 215 @active
212 216 def endgroup(self):
213 217 """apply delayed registration of file entry.
214 218
215 219 This is used by strip to delay vision of strip offset. The transaction
216 220 sees either none or all of the strip actions to be done."""
217 221 q = self._queue.pop()
218 222 for f, o, data in q:
219 223 self._addentry(f, o, data)
220 224
221 225 @active
222 226 def add(self, file, offset, data=None):
223 227 """record the state of an append-only file before update"""
224 228 if file in self._map or file in self._backupmap:
225 229 return
226 230 if self._queue:
227 231 self._queue[-1].append((file, offset, data))
228 232 return
229 233
230 234 self._addentry(file, offset, data)
231 235
232 236 def _addentry(self, file, offset, data):
233 237 """add a append-only entry to memory and on-disk state"""
234 238 if file in self._map or file in self._backupmap:
235 239 return
236 240 self._entries.append((file, offset, data))
237 241 self._map[file] = len(self._entries) - 1
238 242 # add enough data to the journal to do the truncate
239 243 self._file.write("%s\0%d\n" % (file, offset))
240 244 self._file.flush()
241 245
242 246 @active
243 247 def addbackup(self, file, hardlink=True, location=''):
244 248 """Adds a backup of the file to the transaction
245 249
246 250 Calling addbackup() creates a hardlink backup of the specified file
247 251 that is used to recover the file in the event of the transaction
248 252 aborting.
249 253
250 254 * `file`: the file path, relative to .hg/store
251 255 * `hardlink`: use a hardlink to quickly create the backup
252 256 """
253 257 if self._queue:
254 258 msg = 'cannot use transaction.addbackup inside "group"'
255 259 raise error.ProgrammingError(msg)
256 260
257 261 if file in self._map or file in self._backupmap:
258 262 return
259 263 vfs = self._vfsmap[location]
260 264 dirname, filename = vfs.split(file)
261 265 backupfilename = "%s.backup.%s" % (self._journal, filename)
262 266 backupfile = vfs.reljoin(dirname, backupfilename)
263 267 if vfs.exists(file):
264 268 filepath = vfs.join(file)
265 269 backuppath = vfs.join(backupfile)
266 270 util.copyfile(filepath, backuppath, hardlink=hardlink)
267 271 else:
268 272 backupfile = ''
269 273
270 274 self._addbackupentry((location, file, backupfile, False))
271 275
272 276 def _addbackupentry(self, entry):
273 277 """register a new backup entry and write it to disk"""
274 278 self._backupentries.append(entry)
275 279 self._backupmap[entry[1]] = len(self._backupentries) - 1
276 280 self._backupsfile.write("%s\0%s\0%s\0%d\n" % entry)
277 281 self._backupsfile.flush()
278 282
279 283 @active
280 284 def registertmp(self, tmpfile, location=''):
281 285 """register a temporary transaction file
282 286
283 287 Such files will be deleted when the transaction exits (on both
284 288 failure and success).
285 289 """
286 290 self._addbackupentry((location, '', tmpfile, False))
287 291
288 292 @active
289 293 def addfilegenerator(self, genid, filenames, genfunc, order=0,
290 294 location=''):
291 295 """add a function to generates some files at transaction commit
292 296
293 297 The `genfunc` argument is a function capable of generating proper
294 298 content of each entry in the `filename` tuple.
295 299
296 300 At transaction close time, `genfunc` will be called with one file
297 301 object argument per entries in `filenames`.
298 302
299 303 The transaction itself is responsible for the backup, creation and
300 304 final write of such file.
301 305
302 306 The `genid` argument is used to ensure the same set of file is only
303 307 generated once. Call to `addfilegenerator` for a `genid` already
304 308 present will overwrite the old entry.
305 309
306 310 The `order` argument may be used to control the order in which multiple
307 311 generator will be executed.
308 312
309 313 The `location` arguments may be used to indicate the files are located
310 314 outside of the the standard directory for transaction. It should match
311 315 one of the key of the `transaction.vfsmap` dictionary.
312 316 """
313 317 # For now, we are unable to do proper backup and restore of custom vfs
314 318 # but for bookmarks that are handled outside this mechanism.
315 319 self._filegenerators[genid] = (order, filenames, genfunc, location)
316 320
317 321 @active
318 322 def removefilegenerator(self, genid):
319 323 """reverse of addfilegenerator, remove a file generator function"""
320 324 if genid in self._filegenerators:
321 325 del self._filegenerators[genid]
322 326
323 327 def _generatefiles(self, suffix='', group=gengroupall):
324 328 # write files registered for generation
325 329 any = False
326 330 for id, entry in sorted(self._filegenerators.iteritems()):
327 331 any = True
328 332 order, filenames, genfunc, location = entry
329 333
330 334 # for generation at closing, check if it's before or after finalize
331 335 postfinalize = group == gengrouppostfinalize
332 336 if (group != gengroupall and
333 337 (id in postfinalizegenerators) != (postfinalize)):
334 338 continue
335 339
336 340 vfs = self._vfsmap[location]
337 341 files = []
338 342 try:
339 343 for name in filenames:
340 344 name += suffix
341 345 if suffix:
342 346 self.registertmp(name, location=location)
343 347 checkambig = False
344 348 else:
345 349 self.addbackup(name, location=location)
346 350 checkambig = (name, location) in self._checkambigfiles
347 351 files.append(vfs(name, 'w', atomictemp=True,
348 352 checkambig=checkambig))
349 353 genfunc(*files)
350 354 for f in files:
351 355 f.close()
352 356 # skip discard() loop since we're sure no open file remains
353 357 del files[:]
354 358 finally:
355 359 for f in files:
356 360 f.discard()
357 361 return any
358 362
359 363 @active
360 364 def find(self, file):
361 365 if file in self._map:
362 366 return self._entries[self._map[file]]
363 367 if file in self._backupmap:
364 368 return self._backupentries[self._backupmap[file]]
365 369 return None
366 370
367 371 @active
368 372 def replace(self, file, offset, data=None):
369 373 '''
370 374 replace can only replace already committed entries
371 375 that are not pending in the queue
372 376 '''
373 377
374 378 if file not in self._map:
375 379 raise KeyError(file)
376 380 index = self._map[file]
377 381 self._entries[index] = (file, offset, data)
378 382 self._file.write("%s\0%d\n" % (file, offset))
379 383 self._file.flush()
380 384
381 385 @active
382 386 def nest(self, name=r'<unnamed>'):
383 387 self._count += 1
384 388 self._usages += 1
385 389 self._names.append(name)
386 390 return self
387 391
388 392 def release(self):
389 393 if self._count > 0:
390 394 self._usages -= 1
391 395 if self._names:
392 396 self._names.pop()
393 397 # if the transaction scopes are left without being closed, fail
394 398 if self._count > 0 and self._usages == 0:
395 399 self._abort()
396 400
397 401 def running(self):
398 402 return self._count > 0
399 403
400 404 def addpending(self, category, callback):
401 405 """add a callback to be called when the transaction is pending
402 406
403 407 The transaction will be given as callback's first argument.
404 408
405 409 Category is a unique identifier to allow overwriting an old callback
406 410 with a newer callback.
407 411 """
408 412 self._pendingcallback[category] = callback
409 413
410 414 @active
411 415 def writepending(self):
412 416 '''write pending file to temporary version
413 417
414 418 This is used to allow hooks to view a transaction before commit'''
415 419 categories = sorted(self._pendingcallback)
416 420 for cat in categories:
417 421 # remove callback since the data will have been flushed
418 422 any = self._pendingcallback.pop(cat)(self)
419 423 self._anypending = self._anypending or any
420 424 self._anypending |= self._generatefiles(suffix='.pending')
421 425 return self._anypending
422 426
423 427 @active
424 428 def addfinalize(self, category, callback):
425 429 """add a callback to be called when the transaction is closed
426 430
427 431 The transaction will be given as callback's first argument.
428 432
429 433 Category is a unique identifier to allow overwriting old callbacks with
430 434 newer callbacks.
431 435 """
432 436 self._finalizecallback[category] = callback
433 437
434 438 @active
435 439 def addpostclose(self, category, callback):
436 440 """add or replace a callback to be called after the transaction closed
437 441
438 442 The transaction will be given as callback's first argument.
439 443
440 444 Category is a unique identifier to allow overwriting an old callback
441 445 with a newer callback.
442 446 """
443 447 self._postclosecallback[category] = callback
444 448
445 449 @active
446 450 def getpostclose(self, category):
447 451 """return a postclose callback added before, or None"""
448 452 return self._postclosecallback.get(category, None)
449 453
450 454 @active
451 455 def addabort(self, category, callback):
452 456 """add a callback to be called when the transaction is aborted.
453 457
454 458 The transaction will be given as the first argument to the callback.
455 459
456 460 Category is a unique identifier to allow overwriting an old callback
457 461 with a newer callback.
458 462 """
459 463 self._abortcallback[category] = callback
460 464
461 465 @active
462 466 def close(self):
463 467 '''commit the transaction'''
464 468 if self._count == 1:
465 469 self._validator(self) # will raise exception if needed
466 470 self._validator = None # Help prevent cycles.
467 471 self._generatefiles(group=gengroupprefinalize)
468 472 categories = sorted(self._finalizecallback)
469 473 for cat in categories:
470 474 self._finalizecallback[cat](self)
471 475 # Prevent double usage and help clear cycles.
472 476 self._finalizecallback = None
473 477 self._generatefiles(group=gengrouppostfinalize)
474 478
475 479 self._count -= 1
476 480 if self._count != 0:
477 481 return
478 482 self._file.close()
479 483 self._backupsfile.close()
480 484 # cleanup temporary files
481 485 for l, f, b, c in self._backupentries:
482 486 if l not in self._vfsmap and c:
483 487 self._report("couldn't remove %s: unknown cache location %s\n"
484 488 % (b, l))
485 489 continue
486 490 vfs = self._vfsmap[l]
487 491 if not f and b and vfs.exists(b):
488 492 try:
489 493 vfs.unlink(b)
490 494 except (IOError, OSError, error.Abort) as inst:
491 495 if not c:
492 496 raise
493 497 # Abort may be raise by read only opener
494 498 self._report("couldn't remove %s: %s\n"
495 499 % (vfs.join(b), inst))
496 500 self._entries = []
497 501 self._writeundo()
498 502 if self._after:
499 503 self._after()
500 504 self._after = None # Help prevent cycles.
501 505 if self._opener.isfile(self._backupjournal):
502 506 self._opener.unlink(self._backupjournal)
503 507 if self._opener.isfile(self._journal):
504 508 self._opener.unlink(self._journal)
505 509 for l, _f, b, c in self._backupentries:
506 510 if l not in self._vfsmap and c:
507 511 self._report("couldn't remove %s: unknown cache location"
508 512 "%s\n" % (b, l))
509 513 continue
510 514 vfs = self._vfsmap[l]
511 515 if b and vfs.exists(b):
512 516 try:
513 517 vfs.unlink(b)
514 518 except (IOError, OSError, error.Abort) as inst:
515 519 if not c:
516 520 raise
517 521 # Abort may be raise by read only opener
518 522 self._report("couldn't remove %s: %s\n"
519 523 % (vfs.join(b), inst))
520 524 self._backupentries = []
521 525 self._journal = None
522 526
523 527 self._releasefn(self, True) # notify success of closing transaction
524 528 self._releasefn = None # Help prevent cycles.
525 529
526 530 # run post close action
527 531 categories = sorted(self._postclosecallback)
528 532 for cat in categories:
529 533 self._postclosecallback[cat](self)
530 534 # Prevent double usage and help clear cycles.
531 535 self._postclosecallback = None
532 536
533 537 @active
534 538 def abort(self):
535 539 '''abort the transaction (generally called on error, or when the
536 540 transaction is not explicitly committed before going out of
537 541 scope)'''
538 542 self._abort()
539 543
540 544 def _writeundo(self):
541 545 """write transaction data for possible future undo call"""
542 546 if self._undoname is None:
543 547 return
544 548 undobackupfile = self._opener.open("%s.backupfiles" % self._undoname,
545 549 'w')
546 550 undobackupfile.write('%d\n' % version)
547 551 for l, f, b, c in self._backupentries:
548 552 if not f: # temporary file
549 553 continue
550 554 if not b:
551 555 u = ''
552 556 else:
553 557 if l not in self._vfsmap and c:
554 558 self._report("couldn't remove %s: unknown cache location"
555 559 "%s\n" % (b, l))
556 560 continue
557 561 vfs = self._vfsmap[l]
558 562 base, name = vfs.split(b)
559 563 assert name.startswith(self._journal), name
560 564 uname = name.replace(self._journal, self._undoname, 1)
561 565 u = vfs.reljoin(base, uname)
562 566 util.copyfile(vfs.join(b), vfs.join(u), hardlink=True)
563 567 undobackupfile.write("%s\0%s\0%s\0%d\n" % (l, f, u, c))
564 568 undobackupfile.close()
565 569
566 570
567 571 def _abort(self):
568 572 self._count = 0
569 573 self._usages = 0
570 574 self._file.close()
571 575 self._backupsfile.close()
572 576
573 577 try:
574 578 if not self._entries and not self._backupentries:
575 579 if self._backupjournal:
576 580 self._opener.unlink(self._backupjournal)
577 581 if self._journal:
578 582 self._opener.unlink(self._journal)
579 583 return
580 584
581 585 self._report(_("transaction abort!\n"))
582 586
583 587 try:
584 588 for cat in sorted(self._abortcallback):
585 589 self._abortcallback[cat](self)
586 590 # Prevent double usage and help clear cycles.
587 591 self._abortcallback = None
588 592 _playback(self._journal, self._report, self._opener,
589 593 self._vfsmap, self._entries, self._backupentries,
590 594 False, checkambigfiles=self._checkambigfiles)
591 595 self._report(_("rollback completed\n"))
592 596 except BaseException as exc:
593 597 self._report(_("rollback failed - please run hg recover\n"))
594 598 self._report(_("(failure reason: %s)\n")
595 599 % stringutil.forcebytestr(exc))
596 600 finally:
597 601 self._journal = None
598 602 self._releasefn(self, False) # notify failure of transaction
599 603 self._releasefn = None # Help prevent cycles.
600 604
601 605 def rollback(opener, vfsmap, file, report, checkambigfiles=None):
602 606 """Rolls back the transaction contained in the given file
603 607
604 608 Reads the entries in the specified file, and the corresponding
605 609 '*.backupfiles' file, to recover from an incomplete transaction.
606 610
607 611 * `file`: a file containing a list of entries, specifying where
608 612 to truncate each file. The file should contain a list of
609 613 file\0offset pairs, delimited by newlines. The corresponding
610 614 '*.backupfiles' file should contain a list of file\0backupfile
611 615 pairs, delimited by \0.
612 616
613 617 `checkambigfiles` is a set of (path, vfs-location) tuples,
614 618 which determine whether file stat ambiguity should be avoided at
615 619 restoring corresponded files.
616 620 """
617 621 entries = []
618 622 backupentries = []
619 623
620 624 fp = opener.open(file)
621 625 lines = fp.readlines()
622 626 fp.close()
623 627 for l in lines:
624 628 try:
625 629 f, o = l.split('\0')
626 630 entries.append((f, int(o), None))
627 631 except ValueError:
628 632 report(
629 633 _("couldn't read journal entry %r!\n") % pycompat.bytestr(l))
630 634
631 635 backupjournal = "%s.backupfiles" % file
632 636 if opener.exists(backupjournal):
633 637 fp = opener.open(backupjournal)
634 638 lines = fp.readlines()
635 639 if lines:
636 640 ver = lines[0][:-1]
637 641 if ver == (b'%d' % version):
638 642 for line in lines[1:]:
639 643 if line:
640 644 # Shave off the trailing newline
641 645 line = line[:-1]
642 646 l, f, b, c = line.split('\0')
643 647 backupentries.append((l, f, b, bool(c)))
644 648 else:
645 649 report(_("journal was created by a different version of "
646 650 "Mercurial\n"))
647 651
648 652 _playback(file, report, opener, vfsmap, entries, backupentries,
649 653 checkambigfiles=checkambigfiles)
General Comments 0
You need to be logged in to leave comments. Login now