##// END OF EJS Templates
transaction: use absolute_import
Gregory Szorc -
r25986:89049011 default
parent child Browse files
Show More
@@ -1,547 +1,553
1 1 # transaction.py - simple journaling scheme for mercurial
2 2 #
3 3 # This transaction scheme is intended to gracefully handle program
4 4 # errors and interruptions. More serious failures like system crashes
5 5 # can be recovered with an fsck-like tool. As the whole repository is
6 6 # effectively log-structured, this should amount to simply truncating
7 7 # anything that isn't referenced in the changelog.
8 8 #
9 9 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
10 10 #
11 11 # This software may be used and distributed according to the terms of the
12 12 # GNU General Public License version 2 or any later version.
13 13
14 from i18n import _
14 from __future__ import absolute_import
15
15 16 import errno
16 import error, util
17
18 from .i18n import _
19 from . import (
20 error,
21 util,
22 )
17 23
18 24 version = 2
19 25
20 26 def active(func):
21 27 def _active(self, *args, **kwds):
22 28 if self.count == 0:
23 29 raise error.Abort(_(
24 30 'cannot use transaction when it is already committed/aborted'))
25 31 return func(self, *args, **kwds)
26 32 return _active
27 33
28 34 def _playback(journal, report, opener, vfsmap, entries, backupentries,
29 35 unlink=True):
30 36 for f, o, _ignore in entries:
31 37 if o or not unlink:
32 38 try:
33 39 fp = opener(f, 'a')
34 40 fp.truncate(o)
35 41 fp.close()
36 42 except IOError:
37 43 report(_("failed to truncate %s\n") % f)
38 44 raise
39 45 else:
40 46 try:
41 47 opener.unlink(f)
42 48 except (IOError, OSError) as inst:
43 49 if inst.errno != errno.ENOENT:
44 50 raise
45 51
46 52 backupfiles = []
47 53 for l, f, b, c in backupentries:
48 54 if l not in vfsmap and c:
49 55 report("couldn't handle %s: unknown cache location %s\n"
50 56 % (b, l))
51 57 vfs = vfsmap[l]
52 58 try:
53 59 if f and b:
54 60 filepath = vfs.join(f)
55 61 backuppath = vfs.join(b)
56 62 try:
57 63 util.copyfile(backuppath, filepath)
58 64 backupfiles.append(b)
59 65 except IOError:
60 66 report(_("failed to recover %s\n") % f)
61 67 else:
62 68 target = f or b
63 69 try:
64 70 vfs.unlink(target)
65 71 except (IOError, OSError) as inst:
66 72 if inst.errno != errno.ENOENT:
67 73 raise
68 74 except (IOError, OSError, util.Abort) as inst:
69 75 if not c:
70 76 raise
71 77
72 78 opener.unlink(journal)
73 79 backuppath = "%s.backupfiles" % journal
74 80 if opener.exists(backuppath):
75 81 opener.unlink(backuppath)
76 82 try:
77 83 for f in backupfiles:
78 84 if opener.exists(f):
79 85 opener.unlink(f)
80 86 except (IOError, OSError, util.Abort) as inst:
81 87 # only pure backup file remains, it is sage to ignore any error
82 88 pass
83 89
84 90 class transaction(object):
85 91 def __init__(self, report, opener, vfsmap, journalname, undoname=None,
86 92 after=None, createmode=None, validator=None):
87 93 """Begin a new transaction
88 94
89 95 Begins a new transaction that allows rolling back writes in the event of
90 96 an exception.
91 97
92 98 * `after`: called after the transaction has been committed
93 99 * `createmode`: the mode of the journal file that will be created
94 100 """
95 101 self.count = 1
96 102 self.usages = 1
97 103 self.report = report
98 104 # a vfs to the store content
99 105 self.opener = opener
100 106 # a map to access file in various {location -> vfs}
101 107 vfsmap = vfsmap.copy()
102 108 vfsmap[''] = opener # set default value
103 109 self._vfsmap = vfsmap
104 110 self.after = after
105 111 self.entries = []
106 112 self.map = {}
107 113 self.journal = journalname
108 114 self.undoname = undoname
109 115 self._queue = []
110 116 # A callback to validate transaction content before closing it.
111 117 # should raise exception is anything is wrong.
112 118 # target user is repository hooks.
113 119 if validator is None:
114 120 validator = lambda tr: None
115 121 self.validator = validator
116 122 # a dict of arguments to be passed to hooks
117 123 self.hookargs = {}
118 124 self.file = opener.open(self.journal, "w")
119 125
120 126 # a list of ('location', 'path', 'backuppath', cache) entries.
121 127 # - if 'backuppath' is empty, no file existed at backup time
122 128 # - if 'path' is empty, this is a temporary transaction file
123 129 # - if 'location' is not empty, the path is outside main opener reach.
124 130 # use 'location' value as a key in a vfsmap to find the right 'vfs'
125 131 # (cache is currently unused)
126 132 self._backupentries = []
127 133 self._backupmap = {}
128 134 self._backupjournal = "%s.backupfiles" % self.journal
129 135 self._backupsfile = opener.open(self._backupjournal, 'w')
130 136 self._backupsfile.write('%d\n' % version)
131 137
132 138 if createmode is not None:
133 139 opener.chmod(self.journal, createmode & 0o666)
134 140 opener.chmod(self._backupjournal, createmode & 0o666)
135 141
136 142 # hold file generations to be performed on commit
137 143 self._filegenerators = {}
138 144 # hold callback to write pending data for hooks
139 145 self._pendingcallback = {}
140 146 # True is any pending data have been written ever
141 147 self._anypending = False
142 148 # holds callback to call when writing the transaction
143 149 self._finalizecallback = {}
144 150 # hold callback for post transaction close
145 151 self._postclosecallback = {}
146 152 # holds callbacks to call during abort
147 153 self._abortcallback = {}
148 154
149 155 def __del__(self):
150 156 if self.journal:
151 157 self._abort()
152 158
153 159 @active
154 160 def startgroup(self):
155 161 """delay registration of file entry
156 162
157 163 This is used by strip to delay vision of strip offset. The transaction
158 164 sees either none or all of the strip actions to be done."""
159 165 self._queue.append([])
160 166
161 167 @active
162 168 def endgroup(self):
163 169 """apply delayed registration of file entry.
164 170
165 171 This is used by strip to delay vision of strip offset. The transaction
166 172 sees either none or all of the strip actions to be done."""
167 173 q = self._queue.pop()
168 174 for f, o, data in q:
169 175 self._addentry(f, o, data)
170 176
171 177 @active
172 178 def add(self, file, offset, data=None):
173 179 """record the state of an append-only file before update"""
174 180 if file in self.map or file in self._backupmap:
175 181 return
176 182 if self._queue:
177 183 self._queue[-1].append((file, offset, data))
178 184 return
179 185
180 186 self._addentry(file, offset, data)
181 187
182 188 def _addentry(self, file, offset, data):
183 189 """add a append-only entry to memory and on-disk state"""
184 190 if file in self.map or file in self._backupmap:
185 191 return
186 192 self.entries.append((file, offset, data))
187 193 self.map[file] = len(self.entries) - 1
188 194 # add enough data to the journal to do the truncate
189 195 self.file.write("%s\0%d\n" % (file, offset))
190 196 self.file.flush()
191 197
192 198 @active
193 199 def addbackup(self, file, hardlink=True, location=''):
194 200 """Adds a backup of the file to the transaction
195 201
196 202 Calling addbackup() creates a hardlink backup of the specified file
197 203 that is used to recover the file in the event of the transaction
198 204 aborting.
199 205
200 206 * `file`: the file path, relative to .hg/store
201 207 * `hardlink`: use a hardlink to quickly create the backup
202 208 """
203 209 if self._queue:
204 210 msg = 'cannot use transaction.addbackup inside "group"'
205 211 raise RuntimeError(msg)
206 212
207 213 if file in self.map or file in self._backupmap:
208 214 return
209 215 vfs = self._vfsmap[location]
210 216 dirname, filename = vfs.split(file)
211 217 backupfilename = "%s.backup.%s" % (self.journal, filename)
212 218 backupfile = vfs.reljoin(dirname, backupfilename)
213 219 if vfs.exists(file):
214 220 filepath = vfs.join(file)
215 221 backuppath = vfs.join(backupfile)
216 222 util.copyfile(filepath, backuppath, hardlink=hardlink)
217 223 else:
218 224 backupfile = ''
219 225
220 226 self._addbackupentry((location, file, backupfile, False))
221 227
222 228 def _addbackupentry(self, entry):
223 229 """register a new backup entry and write it to disk"""
224 230 self._backupentries.append(entry)
225 231 self._backupmap[entry[1]] = len(self._backupentries) - 1
226 232 self._backupsfile.write("%s\0%s\0%s\0%d\n" % entry)
227 233 self._backupsfile.flush()
228 234
229 235 @active
230 236 def registertmp(self, tmpfile, location=''):
231 237 """register a temporary transaction file
232 238
233 239 Such files will be deleted when the transaction exits (on both
234 240 failure and success).
235 241 """
236 242 self._addbackupentry((location, '', tmpfile, False))
237 243
238 244 @active
239 245 def addfilegenerator(self, genid, filenames, genfunc, order=0,
240 246 location=''):
241 247 """add a function to generates some files at transaction commit
242 248
243 249 The `genfunc` argument is a function capable of generating proper
244 250 content of each entry in the `filename` tuple.
245 251
246 252 At transaction close time, `genfunc` will be called with one file
247 253 object argument per entries in `filenames`.
248 254
249 255 The transaction itself is responsible for the backup, creation and
250 256 final write of such file.
251 257
252 258 The `genid` argument is used to ensure the same set of file is only
253 259 generated once. Call to `addfilegenerator` for a `genid` already
254 260 present will overwrite the old entry.
255 261
256 262 The `order` argument may be used to control the order in which multiple
257 263 generator will be executed.
258 264
259 265 The `location` arguments may be used to indicate the files are located
260 266 outside of the the standard directory for transaction. It should match
261 267 one of the key of the `transaction.vfsmap` dictionary.
262 268 """
263 269 # For now, we are unable to do proper backup and restore of custom vfs
264 270 # but for bookmarks that are handled outside this mechanism.
265 271 self._filegenerators[genid] = (order, filenames, genfunc, location)
266 272
267 273 def _generatefiles(self, suffix=''):
268 274 # write files registered for generation
269 275 any = False
270 276 for entry in sorted(self._filegenerators.values()):
271 277 any = True
272 278 order, filenames, genfunc, location = entry
273 279 vfs = self._vfsmap[location]
274 280 files = []
275 281 try:
276 282 for name in filenames:
277 283 name += suffix
278 284 if suffix:
279 285 self.registertmp(name, location=location)
280 286 else:
281 287 self.addbackup(name, location=location)
282 288 files.append(vfs(name, 'w', atomictemp=True))
283 289 genfunc(*files)
284 290 finally:
285 291 for f in files:
286 292 f.close()
287 293 return any
288 294
289 295 @active
290 296 def find(self, file):
291 297 if file in self.map:
292 298 return self.entries[self.map[file]]
293 299 if file in self._backupmap:
294 300 return self._backupentries[self._backupmap[file]]
295 301 return None
296 302
297 303 @active
298 304 def replace(self, file, offset, data=None):
299 305 '''
300 306 replace can only replace already committed entries
301 307 that are not pending in the queue
302 308 '''
303 309
304 310 if file not in self.map:
305 311 raise KeyError(file)
306 312 index = self.map[file]
307 313 self.entries[index] = (file, offset, data)
308 314 self.file.write("%s\0%d\n" % (file, offset))
309 315 self.file.flush()
310 316
311 317 @active
312 318 def nest(self):
313 319 self.count += 1
314 320 self.usages += 1
315 321 return self
316 322
317 323 def release(self):
318 324 if self.count > 0:
319 325 self.usages -= 1
320 326 # if the transaction scopes are left without being closed, fail
321 327 if self.count > 0 and self.usages == 0:
322 328 self._abort()
323 329
324 330 def running(self):
325 331 return self.count > 0
326 332
327 333 def addpending(self, category, callback):
328 334 """add a callback to be called when the transaction is pending
329 335
330 336 The transaction will be given as callback's first argument.
331 337
332 338 Category is a unique identifier to allow overwriting an old callback
333 339 with a newer callback.
334 340 """
335 341 self._pendingcallback[category] = callback
336 342
337 343 @active
338 344 def writepending(self):
339 345 '''write pending file to temporary version
340 346
341 347 This is used to allow hooks to view a transaction before commit'''
342 348 categories = sorted(self._pendingcallback)
343 349 for cat in categories:
344 350 # remove callback since the data will have been flushed
345 351 any = self._pendingcallback.pop(cat)(self)
346 352 self._anypending = self._anypending or any
347 353 self._anypending |= self._generatefiles(suffix='.pending')
348 354 return self._anypending
349 355
350 356 @active
351 357 def addfinalize(self, category, callback):
352 358 """add a callback to be called when the transaction is closed
353 359
354 360 The transaction will be given as callback's first argument.
355 361
356 362 Category is a unique identifier to allow overwriting old callbacks with
357 363 newer callbacks.
358 364 """
359 365 self._finalizecallback[category] = callback
360 366
361 367 @active
362 368 def addpostclose(self, category, callback):
363 369 """add a callback to be called after the transaction is closed
364 370
365 371 The transaction will be given as callback's first argument.
366 372
367 373 Category is a unique identifier to allow overwriting an old callback
368 374 with a newer callback.
369 375 """
370 376 self._postclosecallback[category] = callback
371 377
372 378 @active
373 379 def addabort(self, category, callback):
374 380 """add a callback to be called when the transaction is aborted.
375 381
376 382 The transaction will be given as the first argument to the callback.
377 383
378 384 Category is a unique identifier to allow overwriting an old callback
379 385 with a newer callback.
380 386 """
381 387 self._abortcallback[category] = callback
382 388
383 389 @active
384 390 def close(self):
385 391 '''commit the transaction'''
386 392 if self.count == 1:
387 393 self.validator(self) # will raise exception if needed
388 394 self._generatefiles()
389 395 categories = sorted(self._finalizecallback)
390 396 for cat in categories:
391 397 self._finalizecallback[cat](self)
392 398
393 399 self.count -= 1
394 400 if self.count != 0:
395 401 return
396 402 self.file.close()
397 403 self._backupsfile.close()
398 404 # cleanup temporary files
399 405 for l, f, b, c in self._backupentries:
400 406 if l not in self._vfsmap and c:
401 407 self.report("couldn't remote %s: unknown cache location %s\n"
402 408 % (b, l))
403 409 continue
404 410 vfs = self._vfsmap[l]
405 411 if not f and b and vfs.exists(b):
406 412 try:
407 413 vfs.unlink(b)
408 414 except (IOError, OSError, util.Abort) as inst:
409 415 if not c:
410 416 raise
411 417 # Abort may be raise by read only opener
412 418 self.report("couldn't remote %s: %s\n"
413 419 % (vfs.join(b), inst))
414 420 self.entries = []
415 421 self._writeundo()
416 422 if self.after:
417 423 self.after()
418 424 if self.opener.isfile(self.journal):
419 425 self.opener.unlink(self.journal)
420 426 if self.opener.isfile(self._backupjournal):
421 427 self.opener.unlink(self._backupjournal)
422 428 for l, _f, b, c in self._backupentries:
423 429 if l not in self._vfsmap and c:
424 430 self.report("couldn't remote %s: unknown cache location"
425 431 "%s\n" % (b, l))
426 432 continue
427 433 vfs = self._vfsmap[l]
428 434 if b and vfs.exists(b):
429 435 try:
430 436 vfs.unlink(b)
431 437 except (IOError, OSError, util.Abort) as inst:
432 438 if not c:
433 439 raise
434 440 # Abort may be raise by read only opener
435 441 self.report("couldn't remote %s: %s\n"
436 442 % (vfs.join(b), inst))
437 443 self._backupentries = []
438 444 self.journal = None
439 445 # run post close action
440 446 categories = sorted(self._postclosecallback)
441 447 for cat in categories:
442 448 self._postclosecallback[cat](self)
443 449
444 450 @active
445 451 def abort(self):
446 452 '''abort the transaction (generally called on error, or when the
447 453 transaction is not explicitly committed before going out of
448 454 scope)'''
449 455 self._abort()
450 456
451 457 def _writeundo(self):
452 458 """write transaction data for possible future undo call"""
453 459 if self.undoname is None:
454 460 return
455 461 undobackupfile = self.opener.open("%s.backupfiles" % self.undoname, 'w')
456 462 undobackupfile.write('%d\n' % version)
457 463 for l, f, b, c in self._backupentries:
458 464 if not f: # temporary file
459 465 continue
460 466 if not b:
461 467 u = ''
462 468 else:
463 469 if l not in self._vfsmap and c:
464 470 self.report("couldn't remote %s: unknown cache location"
465 471 "%s\n" % (b, l))
466 472 continue
467 473 vfs = self._vfsmap[l]
468 474 base, name = vfs.split(b)
469 475 assert name.startswith(self.journal), name
470 476 uname = name.replace(self.journal, self.undoname, 1)
471 477 u = vfs.reljoin(base, uname)
472 478 util.copyfile(vfs.join(b), vfs.join(u), hardlink=True)
473 479 undobackupfile.write("%s\0%s\0%s\0%d\n" % (l, f, u, c))
474 480 undobackupfile.close()
475 481
476 482
477 483 def _abort(self):
478 484 self.count = 0
479 485 self.usages = 0
480 486 self.file.close()
481 487 self._backupsfile.close()
482 488
483 489 try:
484 490 if not self.entries and not self._backupentries:
485 491 if self.journal:
486 492 self.opener.unlink(self.journal)
487 493 if self._backupjournal:
488 494 self.opener.unlink(self._backupjournal)
489 495 return
490 496
491 497 self.report(_("transaction abort!\n"))
492 498
493 499 try:
494 500 for cat in sorted(self._abortcallback):
495 501 self._abortcallback[cat](self)
496 502 _playback(self.journal, self.report, self.opener, self._vfsmap,
497 503 self.entries, self._backupentries, False)
498 504 self.report(_("rollback completed\n"))
499 505 except BaseException:
500 506 self.report(_("rollback failed - please run hg recover\n"))
501 507 finally:
502 508 self.journal = None
503 509
504 510
505 511 def rollback(opener, vfsmap, file, report):
506 512 """Rolls back the transaction contained in the given file
507 513
508 514 Reads the entries in the specified file, and the corresponding
509 515 '*.backupfiles' file, to recover from an incomplete transaction.
510 516
511 517 * `file`: a file containing a list of entries, specifying where
512 518 to truncate each file. The file should contain a list of
513 519 file\0offset pairs, delimited by newlines. The corresponding
514 520 '*.backupfiles' file should contain a list of file\0backupfile
515 521 pairs, delimited by \0.
516 522 """
517 523 entries = []
518 524 backupentries = []
519 525
520 526 fp = opener.open(file)
521 527 lines = fp.readlines()
522 528 fp.close()
523 529 for l in lines:
524 530 try:
525 531 f, o = l.split('\0')
526 532 entries.append((f, int(o), None))
527 533 except ValueError:
528 534 report(_("couldn't read journal entry %r!\n") % l)
529 535
530 536 backupjournal = "%s.backupfiles" % file
531 537 if opener.exists(backupjournal):
532 538 fp = opener.open(backupjournal)
533 539 lines = fp.readlines()
534 540 if lines:
535 541 ver = lines[0][:-1]
536 542 if ver == str(version):
537 543 for line in lines[1:]:
538 544 if line:
539 545 # Shave off the trailing newline
540 546 line = line[:-1]
541 547 l, f, b, c = line.split('\0')
542 548 backupentries.append((l, f, b, bool(c)))
543 549 else:
544 550 report(_("journal was created by a different version of "
545 551 "Mercurial\n"))
546 552
547 553 _playback(file, report, opener, vfsmap, entries, backupentries)
General Comments 0
You need to be logged in to leave comments. Login now