##// END OF EJS Templates
merge with stable
Matt Mackall -
r25302:bcb17d7d merge default
parent child Browse files
Show More
@@ -1,547 +1,547 b''
1 1 # transaction.py - simple journaling scheme for mercurial
2 2 #
3 3 # This transaction scheme is intended to gracefully handle program
4 4 # errors and interruptions. More serious failures like system crashes
5 5 # can be recovered with an fsck-like tool. As the whole repository is
6 6 # effectively log-structured, this should amount to simply truncating
7 7 # anything that isn't referenced in the changelog.
8 8 #
9 9 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
10 10 #
11 11 # This software may be used and distributed according to the terms of the
12 12 # GNU General Public License version 2 or any later version.
13 13
14 14 from i18n import _
15 15 import errno
16 16 import error, util
17 17
18 18 version = 2
19 19
20 20 def active(func):
21 21 def _active(self, *args, **kwds):
22 22 if self.count == 0:
23 23 raise error.Abort(_(
24 24 'cannot use transaction when it is already committed/aborted'))
25 25 return func(self, *args, **kwds)
26 26 return _active
27 27
28 28 def _playback(journal, report, opener, vfsmap, entries, backupentries,
29 29 unlink=True):
30 30 for f, o, _ignore in entries:
31 31 if o or not unlink:
32 32 try:
33 33 fp = opener(f, 'a')
34 34 fp.truncate(o)
35 35 fp.close()
36 36 except IOError:
37 37 report(_("failed to truncate %s\n") % f)
38 38 raise
39 39 else:
40 40 try:
41 41 opener.unlink(f)
42 42 except (IOError, OSError), inst:
43 43 if inst.errno != errno.ENOENT:
44 44 raise
45 45
46 46 backupfiles = []
47 47 for l, f, b, c in backupentries:
48 48 if l not in vfsmap and c:
49 49 report("couldn't handle %s: unknown cache location %s\n"
50 50 % (b, l))
51 51 vfs = vfsmap[l]
52 52 try:
53 53 if f and b:
54 54 filepath = vfs.join(f)
55 55 backuppath = vfs.join(b)
56 56 try:
57 57 util.copyfile(backuppath, filepath)
58 58 backupfiles.append(b)
59 59 except IOError:
60 60 report(_("failed to recover %s\n") % f)
61 61 else:
62 62 target = f or b
63 63 try:
64 64 vfs.unlink(target)
65 65 except (IOError, OSError), inst:
66 66 if inst.errno != errno.ENOENT:
67 67 raise
68 68 except (IOError, OSError, util.Abort), inst:
69 69 if not c:
70 70 raise
71 71
72 72 opener.unlink(journal)
73 73 backuppath = "%s.backupfiles" % journal
74 74 if opener.exists(backuppath):
75 75 opener.unlink(backuppath)
76 76 try:
77 77 for f in backupfiles:
78 78 if opener.exists(f):
79 79 opener.unlink(f)
80 80 except (IOError, OSError, util.Abort), inst:
81 81 # only pure backup file remains, it is sage to ignore any error
82 82 pass
83 83
84 84 class transaction(object):
85 85 def __init__(self, report, opener, vfsmap, journalname, undoname=None,
86 86 after=None, createmode=None, validator=None):
87 87 """Begin a new transaction
88 88
89 89 Begins a new transaction that allows rolling back writes in the event of
90 90 an exception.
91 91
92 92 * `after`: called after the transaction has been committed
93 93 * `createmode`: the mode of the journal file that will be created
94 94 """
95 95 self.count = 1
96 96 self.usages = 1
97 97 self.report = report
98 98 # a vfs to the store content
99 99 self.opener = opener
100 100 # a map to access file in various {location -> vfs}
101 101 vfsmap = vfsmap.copy()
102 102 vfsmap[''] = opener # set default value
103 103 self._vfsmap = vfsmap
104 104 self.after = after
105 105 self.entries = []
106 106 self.map = {}
107 107 self.journal = journalname
108 108 self.undoname = undoname
109 109 self._queue = []
110 110 # A callback to validate transaction content before closing it.
111 111 # should raise exception is anything is wrong.
112 112 # target user is repository hooks.
113 113 if validator is None:
114 114 validator = lambda tr: None
115 115 self.validator = validator
116 116 # a dict of arguments to be passed to hooks
117 117 self.hookargs = {}
118 118 self.file = opener.open(self.journal, "w")
119 119
120 120 # a list of ('location', 'path', 'backuppath', cache) entries.
121 121 # - if 'backuppath' is empty, no file existed at backup time
122 122 # - if 'path' is empty, this is a temporary transaction file
123 123 # - if 'location' is not empty, the path is outside main opener reach.
124 124 # use 'location' value as a key in a vfsmap to find the right 'vfs'
125 125 # (cache is currently unused)
126 126 self._backupentries = []
127 127 self._backupmap = {}
128 128 self._backupjournal = "%s.backupfiles" % self.journal
129 129 self._backupsfile = opener.open(self._backupjournal, 'w')
130 130 self._backupsfile.write('%d\n' % version)
131 131
132 132 if createmode is not None:
133 133 opener.chmod(self.journal, createmode & 0666)
134 134 opener.chmod(self._backupjournal, createmode & 0666)
135 135
136 136 # hold file generations to be performed on commit
137 137 self._filegenerators = {}
138 138 # hold callback to write pending data for hooks
139 139 self._pendingcallback = {}
140 140 # True is any pending data have been written ever
141 141 self._anypending = False
142 142 # holds callback to call when writing the transaction
143 143 self._finalizecallback = {}
144 144 # hold callback for post transaction close
145 145 self._postclosecallback = {}
146 146 # holds callbacks to call during abort
147 147 self._abortcallback = {}
148 148
149 149 def __del__(self):
150 150 if self.journal:
151 151 self._abort()
152 152
153 153 @active
154 154 def startgroup(self):
155 155 """delay registration of file entry
156 156
157 157 This is used by strip to delay vision of strip offset. The transaction
158 158 sees either none or all of the strip actions to be done."""
159 159 self._queue.append([])
160 160
161 161 @active
162 162 def endgroup(self):
163 163 """apply delayed registration of file entry.
164 164
165 165 This is used by strip to delay vision of strip offset. The transaction
166 166 sees either none or all of the strip actions to be done."""
167 167 q = self._queue.pop()
168 168 for f, o, data in q:
169 169 self._addentry(f, o, data)
170 170
171 171 @active
172 172 def add(self, file, offset, data=None):
173 173 """record the state of an append-only file before update"""
174 174 if file in self.map or file in self._backupmap:
175 175 return
176 176 if self._queue:
177 177 self._queue[-1].append((file, offset, data))
178 178 return
179 179
180 180 self._addentry(file, offset, data)
181 181
182 182 def _addentry(self, file, offset, data):
183 183 """add a append-only entry to memory and on-disk state"""
184 184 if file in self.map or file in self._backupmap:
185 185 return
186 186 self.entries.append((file, offset, data))
187 187 self.map[file] = len(self.entries) - 1
188 188 # add enough data to the journal to do the truncate
189 189 self.file.write("%s\0%d\n" % (file, offset))
190 190 self.file.flush()
191 191
192 192 @active
193 193 def addbackup(self, file, hardlink=True, location=''):
194 194 """Adds a backup of the file to the transaction
195 195
196 196 Calling addbackup() creates a hardlink backup of the specified file
197 197 that is used to recover the file in the event of the transaction
198 198 aborting.
199 199
200 200 * `file`: the file path, relative to .hg/store
201 201 * `hardlink`: use a hardlink to quickly create the backup
202 202 """
203 203 if self._queue:
204 204 msg = 'cannot use transaction.addbackup inside "group"'
205 205 raise RuntimeError(msg)
206 206
207 207 if file in self.map or file in self._backupmap:
208 208 return
209 209 vfs = self._vfsmap[location]
210 210 dirname, filename = vfs.split(file)
211 211 backupfilename = "%s.backup.%s" % (self.journal, filename)
212 212 backupfile = vfs.reljoin(dirname, backupfilename)
213 213 if vfs.exists(file):
214 214 filepath = vfs.join(file)
215 215 backuppath = vfs.join(backupfile)
216 216 util.copyfile(filepath, backuppath, hardlink=hardlink)
217 217 else:
218 218 backupfile = ''
219 219
220 220 self._addbackupentry((location, file, backupfile, False))
221 221
222 222 def _addbackupentry(self, entry):
223 223 """register a new backup entry and write it to disk"""
224 224 self._backupentries.append(entry)
225 self._backupmap[entry] = len(self._backupentries) - 1
225 self._backupmap[entry[1]] = len(self._backupentries) - 1
226 226 self._backupsfile.write("%s\0%s\0%s\0%d\n" % entry)
227 227 self._backupsfile.flush()
228 228
229 229 @active
230 230 def registertmp(self, tmpfile, location=''):
231 231 """register a temporary transaction file
232 232
233 233 Such files will be deleted when the transaction exits (on both
234 234 failure and success).
235 235 """
236 236 self._addbackupentry((location, '', tmpfile, False))
237 237
238 238 @active
239 239 def addfilegenerator(self, genid, filenames, genfunc, order=0,
240 240 location=''):
241 241 """add a function to generates some files at transaction commit
242 242
243 243 The `genfunc` argument is a function capable of generating proper
244 244 content of each entry in the `filename` tuple.
245 245
246 246 At transaction close time, `genfunc` will be called with one file
247 247 object argument per entries in `filenames`.
248 248
249 249 The transaction itself is responsible for the backup, creation and
250 250 final write of such file.
251 251
252 252 The `genid` argument is used to ensure the same set of file is only
253 253 generated once. Call to `addfilegenerator` for a `genid` already
254 254 present will overwrite the old entry.
255 255
256 256 The `order` argument may be used to control the order in which multiple
257 257 generator will be executed.
258 258
259 259 The `location` arguments may be used to indicate the files are located
260 260 outside of the the standard directory for transaction. It should match
261 261 one of the key of the `transaction.vfsmap` dictionary.
262 262 """
263 263 # For now, we are unable to do proper backup and restore of custom vfs
264 264 # but for bookmarks that are handled outside this mechanism.
265 265 self._filegenerators[genid] = (order, filenames, genfunc, location)
266 266
267 267 def _generatefiles(self, suffix=''):
268 268 # write files registered for generation
269 269 any = False
270 270 for entry in sorted(self._filegenerators.values()):
271 271 any = True
272 272 order, filenames, genfunc, location = entry
273 273 vfs = self._vfsmap[location]
274 274 files = []
275 275 try:
276 276 for name in filenames:
277 277 name += suffix
278 278 if suffix:
279 279 self.registertmp(name, location=location)
280 280 else:
281 281 self.addbackup(name, location=location)
282 282 files.append(vfs(name, 'w', atomictemp=True))
283 283 genfunc(*files)
284 284 finally:
285 285 for f in files:
286 286 f.close()
287 287 return any
288 288
289 289 @active
290 290 def find(self, file):
291 291 if file in self.map:
292 292 return self.entries[self.map[file]]
293 293 if file in self._backupmap:
294 294 return self._backupentries[self._backupmap[file]]
295 295 return None
296 296
297 297 @active
298 298 def replace(self, file, offset, data=None):
299 299 '''
300 300 replace can only replace already committed entries
301 301 that are not pending in the queue
302 302 '''
303 303
304 304 if file not in self.map:
305 305 raise KeyError(file)
306 306 index = self.map[file]
307 307 self.entries[index] = (file, offset, data)
308 308 self.file.write("%s\0%d\n" % (file, offset))
309 309 self.file.flush()
310 310
311 311 @active
312 312 def nest(self):
313 313 self.count += 1
314 314 self.usages += 1
315 315 return self
316 316
317 317 def release(self):
318 318 if self.count > 0:
319 319 self.usages -= 1
320 320 # if the transaction scopes are left without being closed, fail
321 321 if self.count > 0 and self.usages == 0:
322 322 self._abort()
323 323
324 324 def running(self):
325 325 return self.count > 0
326 326
327 327 def addpending(self, category, callback):
328 328 """add a callback to be called when the transaction is pending
329 329
330 330 The transaction will be given as callback's first argument.
331 331
332 332 Category is a unique identifier to allow overwriting an old callback
333 333 with a newer callback.
334 334 """
335 335 self._pendingcallback[category] = callback
336 336
337 337 @active
338 338 def writepending(self):
339 339 '''write pending file to temporary version
340 340
341 341 This is used to allow hooks to view a transaction before commit'''
342 342 categories = sorted(self._pendingcallback)
343 343 for cat in categories:
344 344 # remove callback since the data will have been flushed
345 345 any = self._pendingcallback.pop(cat)(self)
346 346 self._anypending = self._anypending or any
347 347 self._anypending |= self._generatefiles(suffix='.pending')
348 348 return self._anypending
349 349
350 350 @active
351 351 def addfinalize(self, category, callback):
352 352 """add a callback to be called when the transaction is closed
353 353
354 354 The transaction will be given as callback's first argument.
355 355
356 356 Category is a unique identifier to allow overwriting old callbacks with
357 357 newer callbacks.
358 358 """
359 359 self._finalizecallback[category] = callback
360 360
361 361 @active
362 362 def addpostclose(self, category, callback):
363 363 """add a callback to be called after the transaction is closed
364 364
365 365 The transaction will be given as callback's first argument.
366 366
367 367 Category is a unique identifier to allow overwriting an old callback
368 368 with a newer callback.
369 369 """
370 370 self._postclosecallback[category] = callback
371 371
372 372 @active
373 373 def addabort(self, category, callback):
374 374 """add a callback to be called when the transaction is aborted.
375 375
376 376 The transaction will be given as the first argument to the callback.
377 377
378 378 Category is a unique identifier to allow overwriting an old callback
379 379 with a newer callback.
380 380 """
381 381 self._abortcallback[category] = callback
382 382
383 383 @active
384 384 def close(self):
385 385 '''commit the transaction'''
386 386 if self.count == 1:
387 387 self.validator(self) # will raise exception if needed
388 388 self._generatefiles()
389 389 categories = sorted(self._finalizecallback)
390 390 for cat in categories:
391 391 self._finalizecallback[cat](self)
392 392
393 393 self.count -= 1
394 394 if self.count != 0:
395 395 return
396 396 self.file.close()
397 397 self._backupsfile.close()
398 398 # cleanup temporary files
399 399 for l, f, b, c in self._backupentries:
400 400 if l not in self._vfsmap and c:
401 401 self.report("couldn't remote %s: unknown cache location %s\n"
402 402 % (b, l))
403 403 continue
404 404 vfs = self._vfsmap[l]
405 405 if not f and b and vfs.exists(b):
406 406 try:
407 407 vfs.unlink(b)
408 408 except (IOError, OSError, util.Abort), inst:
409 409 if not c:
410 410 raise
411 411 # Abort may be raise by read only opener
412 412 self.report("couldn't remote %s: %s\n"
413 413 % (vfs.join(b), inst))
414 414 self.entries = []
415 415 self._writeundo()
416 416 if self.after:
417 417 self.after()
418 418 if self.opener.isfile(self.journal):
419 419 self.opener.unlink(self.journal)
420 420 if self.opener.isfile(self._backupjournal):
421 421 self.opener.unlink(self._backupjournal)
422 422 for l, _f, b, c in self._backupentries:
423 423 if l not in self._vfsmap and c:
424 424 self.report("couldn't remote %s: unknown cache location"
425 425 "%s\n" % (b, l))
426 426 continue
427 427 vfs = self._vfsmap[l]
428 428 if b and vfs.exists(b):
429 429 try:
430 430 vfs.unlink(b)
431 431 except (IOError, OSError, util.Abort), inst:
432 432 if not c:
433 433 raise
434 434 # Abort may be raise by read only opener
435 435 self.report("couldn't remote %s: %s\n"
436 436 % (vfs.join(b), inst))
437 437 self._backupentries = []
438 438 self.journal = None
439 439 # run post close action
440 440 categories = sorted(self._postclosecallback)
441 441 for cat in categories:
442 442 self._postclosecallback[cat](self)
443 443
444 444 @active
445 445 def abort(self):
446 446 '''abort the transaction (generally called on error, or when the
447 447 transaction is not explicitly committed before going out of
448 448 scope)'''
449 449 self._abort()
450 450
451 451 def _writeundo(self):
452 452 """write transaction data for possible future undo call"""
453 453 if self.undoname is None:
454 454 return
455 455 undobackupfile = self.opener.open("%s.backupfiles" % self.undoname, 'w')
456 456 undobackupfile.write('%d\n' % version)
457 457 for l, f, b, c in self._backupentries:
458 458 if not f: # temporary file
459 459 continue
460 460 if not b:
461 461 u = ''
462 462 else:
463 463 if l not in self._vfsmap and c:
464 464 self.report("couldn't remote %s: unknown cache location"
465 465 "%s\n" % (b, l))
466 466 continue
467 467 vfs = self._vfsmap[l]
468 468 base, name = vfs.split(b)
469 469 assert name.startswith(self.journal), name
470 470 uname = name.replace(self.journal, self.undoname, 1)
471 471 u = vfs.reljoin(base, uname)
472 472 util.copyfile(vfs.join(b), vfs.join(u), hardlink=True)
473 473 undobackupfile.write("%s\0%s\0%s\0%d\n" % (l, f, u, c))
474 474 undobackupfile.close()
475 475
476 476
477 477 def _abort(self):
478 478 self.count = 0
479 479 self.usages = 0
480 480 self.file.close()
481 481 self._backupsfile.close()
482 482
483 483 try:
484 484 if not self.entries and not self._backupentries:
485 485 if self.journal:
486 486 self.opener.unlink(self.journal)
487 487 if self._backupjournal:
488 488 self.opener.unlink(self._backupjournal)
489 489 return
490 490
491 491 self.report(_("transaction abort!\n"))
492 492
493 493 try:
494 494 for cat in sorted(self._abortcallback):
495 495 self._abortcallback[cat](self)
496 496 _playback(self.journal, self.report, self.opener, self._vfsmap,
497 497 self.entries, self._backupentries, False)
498 498 self.report(_("rollback completed\n"))
499 499 except BaseException:
500 500 self.report(_("rollback failed - please run hg recover\n"))
501 501 finally:
502 502 self.journal = None
503 503
504 504
505 505 def rollback(opener, vfsmap, file, report):
506 506 """Rolls back the transaction contained in the given file
507 507
508 508 Reads the entries in the specified file, and the corresponding
509 509 '*.backupfiles' file, to recover from an incomplete transaction.
510 510
511 511 * `file`: a file containing a list of entries, specifying where
512 512 to truncate each file. The file should contain a list of
513 513 file\0offset pairs, delimited by newlines. The corresponding
514 514 '*.backupfiles' file should contain a list of file\0backupfile
515 515 pairs, delimited by \0.
516 516 """
517 517 entries = []
518 518 backupentries = []
519 519
520 520 fp = opener.open(file)
521 521 lines = fp.readlines()
522 522 fp.close()
523 523 for l in lines:
524 524 try:
525 525 f, o = l.split('\0')
526 526 entries.append((f, int(o), None))
527 527 except ValueError:
528 528 report(_("couldn't read journal entry %r!\n") % l)
529 529
530 530 backupjournal = "%s.backupfiles" % file
531 531 if opener.exists(backupjournal):
532 532 fp = opener.open(backupjournal)
533 533 lines = fp.readlines()
534 534 if lines:
535 535 ver = lines[0][:-1]
536 536 if ver == str(version):
537 537 for line in lines[1:]:
538 538 if line:
539 539 # Shave off the trailing newline
540 540 line = line[:-1]
541 541 l, f, b, c = line.split('\0')
542 542 backupentries.append((l, f, b, bool(c)))
543 543 else:
544 544 report(_("journal was created by a different version of "
545 545 "Mercurial\n"))
546 546
547 547 _playback(file, report, opener, vfsmap, entries, backupentries)
General Comments 0
You need to be logged in to leave comments. Login now