##// END OF EJS Templates
transaction: clear callback instances after usage...
Gregory Szorc -
r28960:14e683d6 default
parent child Browse files
Show More
@@ -1,593 +1,599
1 1 # transaction.py - simple journaling scheme for mercurial
2 2 #
3 3 # This transaction scheme is intended to gracefully handle program
4 4 # errors and interruptions. More serious failures like system crashes
5 5 # can be recovered with an fsck-like tool. As the whole repository is
6 6 # effectively log-structured, this should amount to simply truncating
7 7 # anything that isn't referenced in the changelog.
8 8 #
9 9 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
10 10 #
11 11 # This software may be used and distributed according to the terms of the
12 12 # GNU General Public License version 2 or any later version.
13 13
14 14 from __future__ import absolute_import
15 15
16 16 import errno
17 17
18 18 from .i18n import _
19 19 from . import (
20 20 error,
21 21 util,
22 22 )
23 23
24 24 version = 2
25 25
26 26 # These are the file generators that should only be executed after the
27 27 # finalizers are done, since they rely on the output of the finalizers (like
28 28 # the changelog having been written).
29 29 postfinalizegenerators = set([
30 30 'bookmarks',
31 31 'dirstate'
32 32 ])
33 33
34 34 class GenerationGroup(object):
35 35 ALL='all'
36 36 PREFINALIZE='prefinalize'
37 37 POSTFINALIZE='postfinalize'
38 38
39 39 def active(func):
40 40 def _active(self, *args, **kwds):
41 41 if self.count == 0:
42 42 raise error.Abort(_(
43 43 'cannot use transaction when it is already committed/aborted'))
44 44 return func(self, *args, **kwds)
45 45 return _active
46 46
47 47 def _playback(journal, report, opener, vfsmap, entries, backupentries,
48 48 unlink=True):
49 49 for f, o, _ignore in entries:
50 50 if o or not unlink:
51 51 try:
52 52 fp = opener(f, 'a')
53 53 fp.truncate(o)
54 54 fp.close()
55 55 except IOError:
56 56 report(_("failed to truncate %s\n") % f)
57 57 raise
58 58 else:
59 59 try:
60 60 opener.unlink(f)
61 61 except (IOError, OSError) as inst:
62 62 if inst.errno != errno.ENOENT:
63 63 raise
64 64
65 65 backupfiles = []
66 66 for l, f, b, c in backupentries:
67 67 if l not in vfsmap and c:
68 68 report("couldn't handle %s: unknown cache location %s\n"
69 69 % (b, l))
70 70 vfs = vfsmap[l]
71 71 try:
72 72 if f and b:
73 73 filepath = vfs.join(f)
74 74 backuppath = vfs.join(b)
75 75 try:
76 76 util.copyfile(backuppath, filepath)
77 77 backupfiles.append(b)
78 78 except IOError:
79 79 report(_("failed to recover %s\n") % f)
80 80 else:
81 81 target = f or b
82 82 try:
83 83 vfs.unlink(target)
84 84 except (IOError, OSError) as inst:
85 85 if inst.errno != errno.ENOENT:
86 86 raise
87 87 except (IOError, OSError, error.Abort) as inst:
88 88 if not c:
89 89 raise
90 90
91 91 backuppath = "%s.backupfiles" % journal
92 92 if opener.exists(backuppath):
93 93 opener.unlink(backuppath)
94 94 opener.unlink(journal)
95 95 try:
96 96 for f in backupfiles:
97 97 if opener.exists(f):
98 98 opener.unlink(f)
99 99 except (IOError, OSError, error.Abort) as inst:
100 100 # only pure backup file remains, it is sage to ignore any error
101 101 pass
102 102
103 103 class transaction(object):
104 104 def __init__(self, report, opener, vfsmap, journalname, undoname=None,
105 105 after=None, createmode=None, validator=None, releasefn=None):
106 106 """Begin a new transaction
107 107
108 108 Begins a new transaction that allows rolling back writes in the event of
109 109 an exception.
110 110
111 111 * `after`: called after the transaction has been committed
112 112 * `createmode`: the mode of the journal file that will be created
113 113 * `releasefn`: called after releasing (with transaction and result)
114 114 """
115 115 self.count = 1
116 116 self.usages = 1
117 117 self.report = report
118 118 # a vfs to the store content
119 119 self.opener = opener
120 120 # a map to access file in various {location -> vfs}
121 121 vfsmap = vfsmap.copy()
122 122 vfsmap[''] = opener # set default value
123 123 self._vfsmap = vfsmap
124 124 self.after = after
125 125 self.entries = []
126 126 self.map = {}
127 127 self.journal = journalname
128 128 self.undoname = undoname
129 129 self._queue = []
130 130 # A callback to validate transaction content before closing it.
131 131 # should raise exception is anything is wrong.
132 132 # target user is repository hooks.
133 133 if validator is None:
134 134 validator = lambda tr: None
135 135 self.validator = validator
136 136 # A callback to do something just after releasing transaction.
137 137 if releasefn is None:
138 138 releasefn = lambda tr, success: None
139 139 self.releasefn = releasefn
140 140
141 141 # a dict of arguments to be passed to hooks
142 142 self.hookargs = {}
143 143 self.file = opener.open(self.journal, "w")
144 144
145 145 # a list of ('location', 'path', 'backuppath', cache) entries.
146 146 # - if 'backuppath' is empty, no file existed at backup time
147 147 # - if 'path' is empty, this is a temporary transaction file
148 148 # - if 'location' is not empty, the path is outside main opener reach.
149 149 # use 'location' value as a key in a vfsmap to find the right 'vfs'
150 150 # (cache is currently unused)
151 151 self._backupentries = []
152 152 self._backupmap = {}
153 153 self._backupjournal = "%s.backupfiles" % self.journal
154 154 self._backupsfile = opener.open(self._backupjournal, 'w')
155 155 self._backupsfile.write('%d\n' % version)
156 156
157 157 if createmode is not None:
158 158 opener.chmod(self.journal, createmode & 0o666)
159 159 opener.chmod(self._backupjournal, createmode & 0o666)
160 160
161 161 # hold file generations to be performed on commit
162 162 self._filegenerators = {}
163 163 # hold callback to write pending data for hooks
164 164 self._pendingcallback = {}
165 165 # True is any pending data have been written ever
166 166 self._anypending = False
167 167 # holds callback to call when writing the transaction
168 168 self._finalizecallback = {}
169 169 # hold callback for post transaction close
170 170 self._postclosecallback = {}
171 171 # holds callbacks to call during abort
172 172 self._abortcallback = {}
173 173
174 174 def __del__(self):
175 175 if self.journal:
176 176 self._abort()
177 177
178 178 @active
179 179 def startgroup(self):
180 180 """delay registration of file entry
181 181
182 182 This is used by strip to delay vision of strip offset. The transaction
183 183 sees either none or all of the strip actions to be done."""
184 184 self._queue.append([])
185 185
186 186 @active
187 187 def endgroup(self):
188 188 """apply delayed registration of file entry.
189 189
190 190 This is used by strip to delay vision of strip offset. The transaction
191 191 sees either none or all of the strip actions to be done."""
192 192 q = self._queue.pop()
193 193 for f, o, data in q:
194 194 self._addentry(f, o, data)
195 195
196 196 @active
197 197 def add(self, file, offset, data=None):
198 198 """record the state of an append-only file before update"""
199 199 if file in self.map or file in self._backupmap:
200 200 return
201 201 if self._queue:
202 202 self._queue[-1].append((file, offset, data))
203 203 return
204 204
205 205 self._addentry(file, offset, data)
206 206
207 207 def _addentry(self, file, offset, data):
208 208 """add a append-only entry to memory and on-disk state"""
209 209 if file in self.map or file in self._backupmap:
210 210 return
211 211 self.entries.append((file, offset, data))
212 212 self.map[file] = len(self.entries) - 1
213 213 # add enough data to the journal to do the truncate
214 214 self.file.write("%s\0%d\n" % (file, offset))
215 215 self.file.flush()
216 216
217 217 @active
218 218 def addbackup(self, file, hardlink=True, location=''):
219 219 """Adds a backup of the file to the transaction
220 220
221 221 Calling addbackup() creates a hardlink backup of the specified file
222 222 that is used to recover the file in the event of the transaction
223 223 aborting.
224 224
225 225 * `file`: the file path, relative to .hg/store
226 226 * `hardlink`: use a hardlink to quickly create the backup
227 227 """
228 228 if self._queue:
229 229 msg = 'cannot use transaction.addbackup inside "group"'
230 230 raise RuntimeError(msg)
231 231
232 232 if file in self.map or file in self._backupmap:
233 233 return
234 234 vfs = self._vfsmap[location]
235 235 dirname, filename = vfs.split(file)
236 236 backupfilename = "%s.backup.%s" % (self.journal, filename)
237 237 backupfile = vfs.reljoin(dirname, backupfilename)
238 238 if vfs.exists(file):
239 239 filepath = vfs.join(file)
240 240 backuppath = vfs.join(backupfile)
241 241 util.copyfile(filepath, backuppath, hardlink=hardlink)
242 242 else:
243 243 backupfile = ''
244 244
245 245 self._addbackupentry((location, file, backupfile, False))
246 246
247 247 def _addbackupentry(self, entry):
248 248 """register a new backup entry and write it to disk"""
249 249 self._backupentries.append(entry)
250 250 self._backupmap[entry[1]] = len(self._backupentries) - 1
251 251 self._backupsfile.write("%s\0%s\0%s\0%d\n" % entry)
252 252 self._backupsfile.flush()
253 253
254 254 @active
255 255 def registertmp(self, tmpfile, location=''):
256 256 """register a temporary transaction file
257 257
258 258 Such files will be deleted when the transaction exits (on both
259 259 failure and success).
260 260 """
261 261 self._addbackupentry((location, '', tmpfile, False))
262 262
263 263 @active
264 264 def addfilegenerator(self, genid, filenames, genfunc, order=0,
265 265 location=''):
266 266 """add a function to generates some files at transaction commit
267 267
268 268 The `genfunc` argument is a function capable of generating proper
269 269 content of each entry in the `filename` tuple.
270 270
271 271 At transaction close time, `genfunc` will be called with one file
272 272 object argument per entries in `filenames`.
273 273
274 274 The transaction itself is responsible for the backup, creation and
275 275 final write of such file.
276 276
277 277 The `genid` argument is used to ensure the same set of file is only
278 278 generated once. Call to `addfilegenerator` for a `genid` already
279 279 present will overwrite the old entry.
280 280
281 281 The `order` argument may be used to control the order in which multiple
282 282 generator will be executed.
283 283
284 284 The `location` arguments may be used to indicate the files are located
285 285 outside of the the standard directory for transaction. It should match
286 286 one of the key of the `transaction.vfsmap` dictionary.
287 287 """
288 288 # For now, we are unable to do proper backup and restore of custom vfs
289 289 # but for bookmarks that are handled outside this mechanism.
290 290 self._filegenerators[genid] = (order, filenames, genfunc, location)
291 291
292 292 def _generatefiles(self, suffix='', group=GenerationGroup.ALL):
293 293 # write files registered for generation
294 294 any = False
295 295 for id, entry in sorted(self._filegenerators.iteritems()):
296 296 any = True
297 297 order, filenames, genfunc, location = entry
298 298
299 299 # for generation at closing, check if it's before or after finalize
300 300 postfinalize = group == GenerationGroup.POSTFINALIZE
301 301 if (group != GenerationGroup.ALL and
302 302 (id in postfinalizegenerators) != (postfinalize)):
303 303 continue
304 304
305 305 vfs = self._vfsmap[location]
306 306 files = []
307 307 try:
308 308 for name in filenames:
309 309 name += suffix
310 310 if suffix:
311 311 self.registertmp(name, location=location)
312 312 else:
313 313 self.addbackup(name, location=location)
314 314 files.append(vfs(name, 'w', atomictemp=True))
315 315 genfunc(*files)
316 316 finally:
317 317 for f in files:
318 318 f.close()
319 319 return any
320 320
321 321 @active
322 322 def find(self, file):
323 323 if file in self.map:
324 324 return self.entries[self.map[file]]
325 325 if file in self._backupmap:
326 326 return self._backupentries[self._backupmap[file]]
327 327 return None
328 328
329 329 @active
330 330 def replace(self, file, offset, data=None):
331 331 '''
332 332 replace can only replace already committed entries
333 333 that are not pending in the queue
334 334 '''
335 335
336 336 if file not in self.map:
337 337 raise KeyError(file)
338 338 index = self.map[file]
339 339 self.entries[index] = (file, offset, data)
340 340 self.file.write("%s\0%d\n" % (file, offset))
341 341 self.file.flush()
342 342
343 343 @active
344 344 def nest(self):
345 345 self.count += 1
346 346 self.usages += 1
347 347 return self
348 348
349 349 def release(self):
350 350 if self.count > 0:
351 351 self.usages -= 1
352 352 # if the transaction scopes are left without being closed, fail
353 353 if self.count > 0 and self.usages == 0:
354 354 self._abort()
355 355
356 356 def __enter__(self):
357 357 return self
358 358
359 359 def __exit__(self, exc_type, exc_val, exc_tb):
360 360 try:
361 361 if exc_type is None:
362 362 self.close()
363 363 finally:
364 364 self.release()
365 365
366 366 def running(self):
367 367 return self.count > 0
368 368
369 369 def addpending(self, category, callback):
370 370 """add a callback to be called when the transaction is pending
371 371
372 372 The transaction will be given as callback's first argument.
373 373
374 374 Category is a unique identifier to allow overwriting an old callback
375 375 with a newer callback.
376 376 """
377 377 self._pendingcallback[category] = callback
378 378
379 379 @active
380 380 def writepending(self):
381 381 '''write pending file to temporary version
382 382
383 383 This is used to allow hooks to view a transaction before commit'''
384 384 categories = sorted(self._pendingcallback)
385 385 for cat in categories:
386 386 # remove callback since the data will have been flushed
387 387 any = self._pendingcallback.pop(cat)(self)
388 388 self._anypending = self._anypending or any
389 389 self._anypending |= self._generatefiles(suffix='.pending')
390 390 return self._anypending
391 391
392 392 @active
393 393 def addfinalize(self, category, callback):
394 394 """add a callback to be called when the transaction is closed
395 395
396 396 The transaction will be given as callback's first argument.
397 397
398 398 Category is a unique identifier to allow overwriting old callbacks with
399 399 newer callbacks.
400 400 """
401 401 self._finalizecallback[category] = callback
402 402
403 403 @active
404 404 def addpostclose(self, category, callback):
405 405 """add a callback to be called after the transaction is closed
406 406
407 407 The transaction will be given as callback's first argument.
408 408
409 409 Category is a unique identifier to allow overwriting an old callback
410 410 with a newer callback.
411 411 """
412 412 self._postclosecallback[category] = callback
413 413
414 414 @active
415 415 def addabort(self, category, callback):
416 416 """add a callback to be called when the transaction is aborted.
417 417
418 418 The transaction will be given as the first argument to the callback.
419 419
420 420 Category is a unique identifier to allow overwriting an old callback
421 421 with a newer callback.
422 422 """
423 423 self._abortcallback[category] = callback
424 424
425 425 @active
426 426 def close(self):
427 427 '''commit the transaction'''
428 428 if self.count == 1:
429 429 self.validator(self) # will raise exception if needed
430 430 self._generatefiles(group=GenerationGroup.PREFINALIZE)
431 431 categories = sorted(self._finalizecallback)
432 432 for cat in categories:
433 433 self._finalizecallback[cat](self)
434 # Prevent double usage and help clear cycles.
435 self._finalizecallback = None
434 436 self._generatefiles(group=GenerationGroup.POSTFINALIZE)
435 437
436 438 self.count -= 1
437 439 if self.count != 0:
438 440 return
439 441 self.file.close()
440 442 self._backupsfile.close()
441 443 # cleanup temporary files
442 444 for l, f, b, c in self._backupentries:
443 445 if l not in self._vfsmap and c:
444 446 self.report("couldn't remove %s: unknown cache location %s\n"
445 447 % (b, l))
446 448 continue
447 449 vfs = self._vfsmap[l]
448 450 if not f and b and vfs.exists(b):
449 451 try:
450 452 vfs.unlink(b)
451 453 except (IOError, OSError, error.Abort) as inst:
452 454 if not c:
453 455 raise
454 456 # Abort may be raise by read only opener
455 457 self.report("couldn't remove %s: %s\n"
456 458 % (vfs.join(b), inst))
457 459 self.entries = []
458 460 self._writeundo()
459 461 if self.after:
460 462 self.after()
461 463 if self.opener.isfile(self._backupjournal):
462 464 self.opener.unlink(self._backupjournal)
463 465 if self.opener.isfile(self.journal):
464 466 self.opener.unlink(self.journal)
465 467 for l, _f, b, c in self._backupentries:
466 468 if l not in self._vfsmap and c:
467 469 self.report("couldn't remove %s: unknown cache location"
468 470 "%s\n" % (b, l))
469 471 continue
470 472 vfs = self._vfsmap[l]
471 473 if b and vfs.exists(b):
472 474 try:
473 475 vfs.unlink(b)
474 476 except (IOError, OSError, error.Abort) as inst:
475 477 if not c:
476 478 raise
477 479 # Abort may be raise by read only opener
478 480 self.report("couldn't remove %s: %s\n"
479 481 % (vfs.join(b), inst))
480 482 self._backupentries = []
481 483 self.journal = None
482 484
483 485 self.releasefn(self, True) # notify success of closing transaction
484 486
485 487 # run post close action
486 488 categories = sorted(self._postclosecallback)
487 489 for cat in categories:
488 490 self._postclosecallback[cat](self)
491 # Prevent double usage and help clear cycles.
492 self._postclosecallback = None
489 493
490 494 @active
491 495 def abort(self):
492 496 '''abort the transaction (generally called on error, or when the
493 497 transaction is not explicitly committed before going out of
494 498 scope)'''
495 499 self._abort()
496 500
497 501 def _writeundo(self):
498 502 """write transaction data for possible future undo call"""
499 503 if self.undoname is None:
500 504 return
501 505 undobackupfile = self.opener.open("%s.backupfiles" % self.undoname, 'w')
502 506 undobackupfile.write('%d\n' % version)
503 507 for l, f, b, c in self._backupentries:
504 508 if not f: # temporary file
505 509 continue
506 510 if not b:
507 511 u = ''
508 512 else:
509 513 if l not in self._vfsmap and c:
510 514 self.report("couldn't remove %s: unknown cache location"
511 515 "%s\n" % (b, l))
512 516 continue
513 517 vfs = self._vfsmap[l]
514 518 base, name = vfs.split(b)
515 519 assert name.startswith(self.journal), name
516 520 uname = name.replace(self.journal, self.undoname, 1)
517 521 u = vfs.reljoin(base, uname)
518 522 util.copyfile(vfs.join(b), vfs.join(u), hardlink=True)
519 523 undobackupfile.write("%s\0%s\0%s\0%d\n" % (l, f, u, c))
520 524 undobackupfile.close()
521 525
522 526
523 527 def _abort(self):
524 528 self.count = 0
525 529 self.usages = 0
526 530 self.file.close()
527 531 self._backupsfile.close()
528 532
529 533 try:
530 534 if not self.entries and not self._backupentries:
531 535 if self._backupjournal:
532 536 self.opener.unlink(self._backupjournal)
533 537 if self.journal:
534 538 self.opener.unlink(self.journal)
535 539 return
536 540
537 541 self.report(_("transaction abort!\n"))
538 542
539 543 try:
540 544 for cat in sorted(self._abortcallback):
541 545 self._abortcallback[cat](self)
546 # Prevent double usage and help clear cycles.
547 self._abortcallback = None
542 548 _playback(self.journal, self.report, self.opener, self._vfsmap,
543 549 self.entries, self._backupentries, False)
544 550 self.report(_("rollback completed\n"))
545 551 except BaseException:
546 552 self.report(_("rollback failed - please run hg recover\n"))
547 553 finally:
548 554 self.journal = None
549 555 self.releasefn(self, False) # notify failure of transaction
550 556
551 557 def rollback(opener, vfsmap, file, report):
552 558 """Rolls back the transaction contained in the given file
553 559
554 560 Reads the entries in the specified file, and the corresponding
555 561 '*.backupfiles' file, to recover from an incomplete transaction.
556 562
557 563 * `file`: a file containing a list of entries, specifying where
558 564 to truncate each file. The file should contain a list of
559 565 file\0offset pairs, delimited by newlines. The corresponding
560 566 '*.backupfiles' file should contain a list of file\0backupfile
561 567 pairs, delimited by \0.
562 568 """
563 569 entries = []
564 570 backupentries = []
565 571
566 572 fp = opener.open(file)
567 573 lines = fp.readlines()
568 574 fp.close()
569 575 for l in lines:
570 576 try:
571 577 f, o = l.split('\0')
572 578 entries.append((f, int(o), None))
573 579 except ValueError:
574 580 report(_("couldn't read journal entry %r!\n") % l)
575 581
576 582 backupjournal = "%s.backupfiles" % file
577 583 if opener.exists(backupjournal):
578 584 fp = opener.open(backupjournal)
579 585 lines = fp.readlines()
580 586 if lines:
581 587 ver = lines[0][:-1]
582 588 if ver == str(version):
583 589 for line in lines[1:]:
584 590 if line:
585 591 # Shave off the trailing newline
586 592 line = line[:-1]
587 593 l, f, b, c = line.split('\0')
588 594 backupentries.append((l, f, b, bool(c)))
589 595 else:
590 596 report(_("journal was created by a different version of "
591 597 "Mercurial\n"))
592 598
593 599 _playback(file, report, opener, vfsmap, entries, backupentries)
General Comments 0
You need to be logged in to leave comments. Login now