##// END OF EJS Templates
transaction: open a file with checkambig=True to avoid file stat ambiguity...
FUJIWARA Katsunori -
r30002:599912a6 default
parent child Browse files
Show More
@@ -1,599 +1,599 b''
1 1 # transaction.py - simple journaling scheme for mercurial
2 2 #
3 3 # This transaction scheme is intended to gracefully handle program
4 4 # errors and interruptions. More serious failures like system crashes
5 5 # can be recovered with an fsck-like tool. As the whole repository is
6 6 # effectively log-structured, this should amount to simply truncating
7 7 # anything that isn't referenced in the changelog.
8 8 #
9 9 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
10 10 #
11 11 # This software may be used and distributed according to the terms of the
12 12 # GNU General Public License version 2 or any later version.
13 13
14 14 from __future__ import absolute_import
15 15
16 16 import errno
17 17
18 18 from .i18n import _
19 19 from . import (
20 20 error,
21 21 util,
22 22 )
23 23
24 24 version = 2
25 25
26 26 # These are the file generators that should only be executed after the
27 27 # finalizers are done, since they rely on the output of the finalizers (like
28 28 # the changelog having been written).
29 29 postfinalizegenerators = set([
30 30 'bookmarks',
31 31 'dirstate'
32 32 ])
33 33
34 34 gengroupall='all'
35 35 gengroupprefinalize='prefinalize'
36 36 gengrouppostfinalize='postfinalize'
37 37
38 38 def active(func):
39 39 def _active(self, *args, **kwds):
40 40 if self.count == 0:
41 41 raise error.Abort(_(
42 42 'cannot use transaction when it is already committed/aborted'))
43 43 return func(self, *args, **kwds)
44 44 return _active
45 45
46 46 def _playback(journal, report, opener, vfsmap, entries, backupentries,
47 47 unlink=True):
48 48 for f, o, _ignore in entries:
49 49 if o or not unlink:
50 50 try:
51 fp = opener(f, 'a')
51 fp = opener(f, 'a', checkambig=True)
52 52 fp.truncate(o)
53 53 fp.close()
54 54 except IOError:
55 55 report(_("failed to truncate %s\n") % f)
56 56 raise
57 57 else:
58 58 try:
59 59 opener.unlink(f)
60 60 except (IOError, OSError) as inst:
61 61 if inst.errno != errno.ENOENT:
62 62 raise
63 63
64 64 backupfiles = []
65 65 for l, f, b, c in backupentries:
66 66 if l not in vfsmap and c:
67 67 report("couldn't handle %s: unknown cache location %s\n"
68 68 % (b, l))
69 69 vfs = vfsmap[l]
70 70 try:
71 71 if f and b:
72 72 filepath = vfs.join(f)
73 73 backuppath = vfs.join(b)
74 74 try:
75 75 util.copyfile(backuppath, filepath, checkambig=True)
76 76 backupfiles.append(b)
77 77 except IOError:
78 78 report(_("failed to recover %s\n") % f)
79 79 else:
80 80 target = f or b
81 81 try:
82 82 vfs.unlink(target)
83 83 except (IOError, OSError) as inst:
84 84 if inst.errno != errno.ENOENT:
85 85 raise
86 86 except (IOError, OSError, error.Abort) as inst:
87 87 if not c:
88 88 raise
89 89
90 90 backuppath = "%s.backupfiles" % journal
91 91 if opener.exists(backuppath):
92 92 opener.unlink(backuppath)
93 93 opener.unlink(journal)
94 94 try:
95 95 for f in backupfiles:
96 96 if opener.exists(f):
97 97 opener.unlink(f)
98 98 except (IOError, OSError, error.Abort) as inst:
99 99 # only pure backup file remains, it is sage to ignore any error
100 100 pass
101 101
102 102 class transaction(object):
103 103 def __init__(self, report, opener, vfsmap, journalname, undoname=None,
104 104 after=None, createmode=None, validator=None, releasefn=None):
105 105 """Begin a new transaction
106 106
107 107 Begins a new transaction that allows rolling back writes in the event of
108 108 an exception.
109 109
110 110 * `after`: called after the transaction has been committed
111 111 * `createmode`: the mode of the journal file that will be created
112 112 * `releasefn`: called after releasing (with transaction and result)
113 113 """
114 114 self.count = 1
115 115 self.usages = 1
116 116 self.report = report
117 117 # a vfs to the store content
118 118 self.opener = opener
119 119 # a map to access file in various {location -> vfs}
120 120 vfsmap = vfsmap.copy()
121 121 vfsmap[''] = opener # set default value
122 122 self._vfsmap = vfsmap
123 123 self.after = after
124 124 self.entries = []
125 125 self.map = {}
126 126 self.journal = journalname
127 127 self.undoname = undoname
128 128 self._queue = []
129 129 # A callback to validate transaction content before closing it.
130 130 # should raise exception is anything is wrong.
131 131 # target user is repository hooks.
132 132 if validator is None:
133 133 validator = lambda tr: None
134 134 self.validator = validator
135 135 # A callback to do something just after releasing transaction.
136 136 if releasefn is None:
137 137 releasefn = lambda tr, success: None
138 138 self.releasefn = releasefn
139 139
140 140 # a dict of arguments to be passed to hooks
141 141 self.hookargs = {}
142 142 self.file = opener.open(self.journal, "w")
143 143
144 144 # a list of ('location', 'path', 'backuppath', cache) entries.
145 145 # - if 'backuppath' is empty, no file existed at backup time
146 146 # - if 'path' is empty, this is a temporary transaction file
147 147 # - if 'location' is not empty, the path is outside main opener reach.
148 148 # use 'location' value as a key in a vfsmap to find the right 'vfs'
149 149 # (cache is currently unused)
150 150 self._backupentries = []
151 151 self._backupmap = {}
152 152 self._backupjournal = "%s.backupfiles" % self.journal
153 153 self._backupsfile = opener.open(self._backupjournal, 'w')
154 154 self._backupsfile.write('%d\n' % version)
155 155
156 156 if createmode is not None:
157 157 opener.chmod(self.journal, createmode & 0o666)
158 158 opener.chmod(self._backupjournal, createmode & 0o666)
159 159
160 160 # hold file generations to be performed on commit
161 161 self._filegenerators = {}
162 162 # hold callback to write pending data for hooks
163 163 self._pendingcallback = {}
164 164 # True is any pending data have been written ever
165 165 self._anypending = False
166 166 # holds callback to call when writing the transaction
167 167 self._finalizecallback = {}
168 168 # hold callback for post transaction close
169 169 self._postclosecallback = {}
170 170 # holds callbacks to call during abort
171 171 self._abortcallback = {}
172 172
173 173 def __del__(self):
174 174 if self.journal:
175 175 self._abort()
176 176
177 177 @active
178 178 def startgroup(self):
179 179 """delay registration of file entry
180 180
181 181 This is used by strip to delay vision of strip offset. The transaction
182 182 sees either none or all of the strip actions to be done."""
183 183 self._queue.append([])
184 184
185 185 @active
186 186 def endgroup(self):
187 187 """apply delayed registration of file entry.
188 188
189 189 This is used by strip to delay vision of strip offset. The transaction
190 190 sees either none or all of the strip actions to be done."""
191 191 q = self._queue.pop()
192 192 for f, o, data in q:
193 193 self._addentry(f, o, data)
194 194
195 195 @active
196 196 def add(self, file, offset, data=None):
197 197 """record the state of an append-only file before update"""
198 198 if file in self.map or file in self._backupmap:
199 199 return
200 200 if self._queue:
201 201 self._queue[-1].append((file, offset, data))
202 202 return
203 203
204 204 self._addentry(file, offset, data)
205 205
206 206 def _addentry(self, file, offset, data):
207 207 """add a append-only entry to memory and on-disk state"""
208 208 if file in self.map or file in self._backupmap:
209 209 return
210 210 self.entries.append((file, offset, data))
211 211 self.map[file] = len(self.entries) - 1
212 212 # add enough data to the journal to do the truncate
213 213 self.file.write("%s\0%d\n" % (file, offset))
214 214 self.file.flush()
215 215
216 216 @active
217 217 def addbackup(self, file, hardlink=True, location=''):
218 218 """Adds a backup of the file to the transaction
219 219
220 220 Calling addbackup() creates a hardlink backup of the specified file
221 221 that is used to recover the file in the event of the transaction
222 222 aborting.
223 223
224 224 * `file`: the file path, relative to .hg/store
225 225 * `hardlink`: use a hardlink to quickly create the backup
226 226 """
227 227 if self._queue:
228 228 msg = 'cannot use transaction.addbackup inside "group"'
229 229 raise RuntimeError(msg)
230 230
231 231 if file in self.map or file in self._backupmap:
232 232 return
233 233 vfs = self._vfsmap[location]
234 234 dirname, filename = vfs.split(file)
235 235 backupfilename = "%s.backup.%s" % (self.journal, filename)
236 236 backupfile = vfs.reljoin(dirname, backupfilename)
237 237 if vfs.exists(file):
238 238 filepath = vfs.join(file)
239 239 backuppath = vfs.join(backupfile)
240 240 util.copyfile(filepath, backuppath, hardlink=hardlink)
241 241 else:
242 242 backupfile = ''
243 243
244 244 self._addbackupentry((location, file, backupfile, False))
245 245
246 246 def _addbackupentry(self, entry):
247 247 """register a new backup entry and write it to disk"""
248 248 self._backupentries.append(entry)
249 249 self._backupmap[entry[1]] = len(self._backupentries) - 1
250 250 self._backupsfile.write("%s\0%s\0%s\0%d\n" % entry)
251 251 self._backupsfile.flush()
252 252
253 253 @active
254 254 def registertmp(self, tmpfile, location=''):
255 255 """register a temporary transaction file
256 256
257 257 Such files will be deleted when the transaction exits (on both
258 258 failure and success).
259 259 """
260 260 self._addbackupentry((location, '', tmpfile, False))
261 261
262 262 @active
263 263 def addfilegenerator(self, genid, filenames, genfunc, order=0,
264 264 location=''):
265 265 """add a function to generates some files at transaction commit
266 266
267 267 The `genfunc` argument is a function capable of generating proper
268 268 content of each entry in the `filename` tuple.
269 269
270 270 At transaction close time, `genfunc` will be called with one file
271 271 object argument per entries in `filenames`.
272 272
273 273 The transaction itself is responsible for the backup, creation and
274 274 final write of such file.
275 275
276 276 The `genid` argument is used to ensure the same set of file is only
277 277 generated once. Call to `addfilegenerator` for a `genid` already
278 278 present will overwrite the old entry.
279 279
280 280 The `order` argument may be used to control the order in which multiple
281 281 generator will be executed.
282 282
283 283 The `location` arguments may be used to indicate the files are located
284 284 outside of the the standard directory for transaction. It should match
285 285 one of the key of the `transaction.vfsmap` dictionary.
286 286 """
287 287 # For now, we are unable to do proper backup and restore of custom vfs
288 288 # but for bookmarks that are handled outside this mechanism.
289 289 self._filegenerators[genid] = (order, filenames, genfunc, location)
290 290
291 291 def _generatefiles(self, suffix='', group=gengroupall):
292 292 # write files registered for generation
293 293 any = False
294 294 for id, entry in sorted(self._filegenerators.iteritems()):
295 295 any = True
296 296 order, filenames, genfunc, location = entry
297 297
298 298 # for generation at closing, check if it's before or after finalize
299 299 postfinalize = group == gengrouppostfinalize
300 300 if (group != gengroupall and
301 301 (id in postfinalizegenerators) != (postfinalize)):
302 302 continue
303 303
304 304 vfs = self._vfsmap[location]
305 305 files = []
306 306 try:
307 307 for name in filenames:
308 308 name += suffix
309 309 if suffix:
310 310 self.registertmp(name, location=location)
311 311 else:
312 312 self.addbackup(name, location=location)
313 313 files.append(vfs(name, 'w', atomictemp=True,
314 314 checkambig=not suffix))
315 315 genfunc(*files)
316 316 finally:
317 317 for f in files:
318 318 f.close()
319 319 return any
320 320
321 321 @active
322 322 def find(self, file):
323 323 if file in self.map:
324 324 return self.entries[self.map[file]]
325 325 if file in self._backupmap:
326 326 return self._backupentries[self._backupmap[file]]
327 327 return None
328 328
329 329 @active
330 330 def replace(self, file, offset, data=None):
331 331 '''
332 332 replace can only replace already committed entries
333 333 that are not pending in the queue
334 334 '''
335 335
336 336 if file not in self.map:
337 337 raise KeyError(file)
338 338 index = self.map[file]
339 339 self.entries[index] = (file, offset, data)
340 340 self.file.write("%s\0%d\n" % (file, offset))
341 341 self.file.flush()
342 342
343 343 @active
344 344 def nest(self):
345 345 self.count += 1
346 346 self.usages += 1
347 347 return self
348 348
349 349 def release(self):
350 350 if self.count > 0:
351 351 self.usages -= 1
352 352 # if the transaction scopes are left without being closed, fail
353 353 if self.count > 0 and self.usages == 0:
354 354 self._abort()
355 355
356 356 def __enter__(self):
357 357 return self
358 358
359 359 def __exit__(self, exc_type, exc_val, exc_tb):
360 360 try:
361 361 if exc_type is None:
362 362 self.close()
363 363 finally:
364 364 self.release()
365 365
366 366 def running(self):
367 367 return self.count > 0
368 368
369 369 def addpending(self, category, callback):
370 370 """add a callback to be called when the transaction is pending
371 371
372 372 The transaction will be given as callback's first argument.
373 373
374 374 Category is a unique identifier to allow overwriting an old callback
375 375 with a newer callback.
376 376 """
377 377 self._pendingcallback[category] = callback
378 378
379 379 @active
380 380 def writepending(self):
381 381 '''write pending file to temporary version
382 382
383 383 This is used to allow hooks to view a transaction before commit'''
384 384 categories = sorted(self._pendingcallback)
385 385 for cat in categories:
386 386 # remove callback since the data will have been flushed
387 387 any = self._pendingcallback.pop(cat)(self)
388 388 self._anypending = self._anypending or any
389 389 self._anypending |= self._generatefiles(suffix='.pending')
390 390 return self._anypending
391 391
392 392 @active
393 393 def addfinalize(self, category, callback):
394 394 """add a callback to be called when the transaction is closed
395 395
396 396 The transaction will be given as callback's first argument.
397 397
398 398 Category is a unique identifier to allow overwriting old callbacks with
399 399 newer callbacks.
400 400 """
401 401 self._finalizecallback[category] = callback
402 402
403 403 @active
404 404 def addpostclose(self, category, callback):
405 405 """add a callback to be called after the transaction is closed
406 406
407 407 The transaction will be given as callback's first argument.
408 408
409 409 Category is a unique identifier to allow overwriting an old callback
410 410 with a newer callback.
411 411 """
412 412 self._postclosecallback[category] = callback
413 413
414 414 @active
415 415 def addabort(self, category, callback):
416 416 """add a callback to be called when the transaction is aborted.
417 417
418 418 The transaction will be given as the first argument to the callback.
419 419
420 420 Category is a unique identifier to allow overwriting an old callback
421 421 with a newer callback.
422 422 """
423 423 self._abortcallback[category] = callback
424 424
425 425 @active
426 426 def close(self):
427 427 '''commit the transaction'''
428 428 if self.count == 1:
429 429 self.validator(self) # will raise exception if needed
430 430 self._generatefiles(group=gengroupprefinalize)
431 431 categories = sorted(self._finalizecallback)
432 432 for cat in categories:
433 433 self._finalizecallback[cat](self)
434 434 # Prevent double usage and help clear cycles.
435 435 self._finalizecallback = None
436 436 self._generatefiles(group=gengrouppostfinalize)
437 437
438 438 self.count -= 1
439 439 if self.count != 0:
440 440 return
441 441 self.file.close()
442 442 self._backupsfile.close()
443 443 # cleanup temporary files
444 444 for l, f, b, c in self._backupentries:
445 445 if l not in self._vfsmap and c:
446 446 self.report("couldn't remove %s: unknown cache location %s\n"
447 447 % (b, l))
448 448 continue
449 449 vfs = self._vfsmap[l]
450 450 if not f and b and vfs.exists(b):
451 451 try:
452 452 vfs.unlink(b)
453 453 except (IOError, OSError, error.Abort) as inst:
454 454 if not c:
455 455 raise
456 456 # Abort may be raise by read only opener
457 457 self.report("couldn't remove %s: %s\n"
458 458 % (vfs.join(b), inst))
459 459 self.entries = []
460 460 self._writeundo()
461 461 if self.after:
462 462 self.after()
463 463 if self.opener.isfile(self._backupjournal):
464 464 self.opener.unlink(self._backupjournal)
465 465 if self.opener.isfile(self.journal):
466 466 self.opener.unlink(self.journal)
467 467 for l, _f, b, c in self._backupentries:
468 468 if l not in self._vfsmap and c:
469 469 self.report("couldn't remove %s: unknown cache location"
470 470 "%s\n" % (b, l))
471 471 continue
472 472 vfs = self._vfsmap[l]
473 473 if b and vfs.exists(b):
474 474 try:
475 475 vfs.unlink(b)
476 476 except (IOError, OSError, error.Abort) as inst:
477 477 if not c:
478 478 raise
479 479 # Abort may be raise by read only opener
480 480 self.report("couldn't remove %s: %s\n"
481 481 % (vfs.join(b), inst))
482 482 self._backupentries = []
483 483 self.journal = None
484 484
485 485 self.releasefn(self, True) # notify success of closing transaction
486 486
487 487 # run post close action
488 488 categories = sorted(self._postclosecallback)
489 489 for cat in categories:
490 490 self._postclosecallback[cat](self)
491 491 # Prevent double usage and help clear cycles.
492 492 self._postclosecallback = None
493 493
494 494 @active
495 495 def abort(self):
496 496 '''abort the transaction (generally called on error, or when the
497 497 transaction is not explicitly committed before going out of
498 498 scope)'''
499 499 self._abort()
500 500
501 501 def _writeundo(self):
502 502 """write transaction data for possible future undo call"""
503 503 if self.undoname is None:
504 504 return
505 505 undobackupfile = self.opener.open("%s.backupfiles" % self.undoname, 'w')
506 506 undobackupfile.write('%d\n' % version)
507 507 for l, f, b, c in self._backupentries:
508 508 if not f: # temporary file
509 509 continue
510 510 if not b:
511 511 u = ''
512 512 else:
513 513 if l not in self._vfsmap and c:
514 514 self.report("couldn't remove %s: unknown cache location"
515 515 "%s\n" % (b, l))
516 516 continue
517 517 vfs = self._vfsmap[l]
518 518 base, name = vfs.split(b)
519 519 assert name.startswith(self.journal), name
520 520 uname = name.replace(self.journal, self.undoname, 1)
521 521 u = vfs.reljoin(base, uname)
522 522 util.copyfile(vfs.join(b), vfs.join(u), hardlink=True)
523 523 undobackupfile.write("%s\0%s\0%s\0%d\n" % (l, f, u, c))
524 524 undobackupfile.close()
525 525
526 526
527 527 def _abort(self):
528 528 self.count = 0
529 529 self.usages = 0
530 530 self.file.close()
531 531 self._backupsfile.close()
532 532
533 533 try:
534 534 if not self.entries and not self._backupentries:
535 535 if self._backupjournal:
536 536 self.opener.unlink(self._backupjournal)
537 537 if self.journal:
538 538 self.opener.unlink(self.journal)
539 539 return
540 540
541 541 self.report(_("transaction abort!\n"))
542 542
543 543 try:
544 544 for cat in sorted(self._abortcallback):
545 545 self._abortcallback[cat](self)
546 546 # Prevent double usage and help clear cycles.
547 547 self._abortcallback = None
548 548 _playback(self.journal, self.report, self.opener, self._vfsmap,
549 549 self.entries, self._backupentries, False)
550 550 self.report(_("rollback completed\n"))
551 551 except BaseException:
552 552 self.report(_("rollback failed - please run hg recover\n"))
553 553 finally:
554 554 self.journal = None
555 555 self.releasefn(self, False) # notify failure of transaction
556 556
557 557 def rollback(opener, vfsmap, file, report):
558 558 """Rolls back the transaction contained in the given file
559 559
560 560 Reads the entries in the specified file, and the corresponding
561 561 '*.backupfiles' file, to recover from an incomplete transaction.
562 562
563 563 * `file`: a file containing a list of entries, specifying where
564 564 to truncate each file. The file should contain a list of
565 565 file\0offset pairs, delimited by newlines. The corresponding
566 566 '*.backupfiles' file should contain a list of file\0backupfile
567 567 pairs, delimited by \0.
568 568 """
569 569 entries = []
570 570 backupentries = []
571 571
572 572 fp = opener.open(file)
573 573 lines = fp.readlines()
574 574 fp.close()
575 575 for l in lines:
576 576 try:
577 577 f, o = l.split('\0')
578 578 entries.append((f, int(o), None))
579 579 except ValueError:
580 580 report(_("couldn't read journal entry %r!\n") % l)
581 581
582 582 backupjournal = "%s.backupfiles" % file
583 583 if opener.exists(backupjournal):
584 584 fp = opener.open(backupjournal)
585 585 lines = fp.readlines()
586 586 if lines:
587 587 ver = lines[0][:-1]
588 588 if ver == str(version):
589 589 for line in lines[1:]:
590 590 if line:
591 591 # Shave off the trailing newline
592 592 line = line[:-1]
593 593 l, f, b, c = line.split('\0')
594 594 backupentries.append((l, f, b, bool(c)))
595 595 else:
596 596 report(_("journal was created by a different version of "
597 597 "Mercurial\n"))
598 598
599 599 _playback(file, report, opener, vfsmap, entries, backupentries)
General Comments 0
You need to be logged in to leave comments. Login now