##// END OF EJS Templates
transaction: fix file descriptor leak for journal.backupfiles...
Durham Goode -
r21206:c7741893 stable
parent child Browse files
Show More
@@ -1,305 +1,307 b''
1 # transaction.py - simple journaling scheme for mercurial
1 # transaction.py - simple journaling scheme for mercurial
2 #
2 #
3 # This transaction scheme is intended to gracefully handle program
3 # This transaction scheme is intended to gracefully handle program
4 # errors and interruptions. More serious failures like system crashes
4 # errors and interruptions. More serious failures like system crashes
5 # can be recovered with an fsck-like tool. As the whole repository is
5 # can be recovered with an fsck-like tool. As the whole repository is
6 # effectively log-structured, this should amount to simply truncating
6 # effectively log-structured, this should amount to simply truncating
7 # anything that isn't referenced in the changelog.
7 # anything that isn't referenced in the changelog.
8 #
8 #
9 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
9 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
10 #
10 #
11 # This software may be used and distributed according to the terms of the
11 # This software may be used and distributed according to the terms of the
12 # GNU General Public License version 2 or any later version.
12 # GNU General Public License version 2 or any later version.
13
13
14 from i18n import _
14 from i18n import _
15 import errno
15 import errno
16 import error, util
16 import error, util
17
17
18 def active(func):
18 def active(func):
19 def _active(self, *args, **kwds):
19 def _active(self, *args, **kwds):
20 if self.count == 0:
20 if self.count == 0:
21 raise error.Abort(_(
21 raise error.Abort(_(
22 'cannot use transaction when it is already committed/aborted'))
22 'cannot use transaction when it is already committed/aborted'))
23 return func(self, *args, **kwds)
23 return func(self, *args, **kwds)
24 return _active
24 return _active
25
25
26 def _playback(journal, report, opener, entries, backupentries, unlink=True):
26 def _playback(journal, report, opener, entries, backupentries, unlink=True):
27 for f, o, ignore in entries:
27 for f, o, ignore in entries:
28 if o or not unlink:
28 if o or not unlink:
29 try:
29 try:
30 fp = opener(f, 'a')
30 fp = opener(f, 'a')
31 fp.truncate(o)
31 fp.truncate(o)
32 fp.close()
32 fp.close()
33 except IOError:
33 except IOError:
34 report(_("failed to truncate %s\n") % f)
34 report(_("failed to truncate %s\n") % f)
35 raise
35 raise
36 else:
36 else:
37 try:
37 try:
38 opener.unlink(f)
38 opener.unlink(f)
39 except (IOError, OSError), inst:
39 except (IOError, OSError), inst:
40 if inst.errno != errno.ENOENT:
40 if inst.errno != errno.ENOENT:
41 raise
41 raise
42
42
43 backupfiles = []
43 backupfiles = []
44 for f, b, ignore in backupentries:
44 for f, b, ignore in backupentries:
45 filepath = opener.join(f)
45 filepath = opener.join(f)
46 backuppath = opener.join(b)
46 backuppath = opener.join(b)
47 try:
47 try:
48 util.copyfile(backuppath, filepath)
48 util.copyfile(backuppath, filepath)
49 backupfiles.append(b)
49 backupfiles.append(b)
50 except IOError:
50 except IOError:
51 report(_("failed to recover %s\n") % f)
51 report(_("failed to recover %s\n") % f)
52 raise
52 raise
53
53
54 opener.unlink(journal)
54 opener.unlink(journal)
55 backuppath = "%s.backupfiles" % journal
55 backuppath = "%s.backupfiles" % journal
56 if opener.exists(backuppath):
56 if opener.exists(backuppath):
57 opener.unlink(backuppath)
57 opener.unlink(backuppath)
58 for f in backupfiles:
58 for f in backupfiles:
59 opener.unlink(f)
59 opener.unlink(f)
60
60
61 class transaction(object):
61 class transaction(object):
62 def __init__(self, report, opener, journal, after=None, createmode=None,
62 def __init__(self, report, opener, journal, after=None, createmode=None,
63 onclose=None, onabort=None):
63 onclose=None, onabort=None):
64 """Begin a new transaction
64 """Begin a new transaction
65
65
66 Begins a new transaction that allows rolling back writes in the event of
66 Begins a new transaction that allows rolling back writes in the event of
67 an exception.
67 an exception.
68
68
69 * `after`: called after the transaction has been committed
69 * `after`: called after the transaction has been committed
70 * `createmode`: the mode of the journal file that will be created
70 * `createmode`: the mode of the journal file that will be created
71 * `onclose`: called as the transaction is closing, but before it is
71 * `onclose`: called as the transaction is closing, but before it is
72 closed
72 closed
73 * `onabort`: called as the transaction is aborting, but before any files
73 * `onabort`: called as the transaction is aborting, but before any files
74 have been truncated
74 have been truncated
75 """
75 """
76 self.count = 1
76 self.count = 1
77 self.usages = 1
77 self.usages = 1
78 self.report = report
78 self.report = report
79 self.opener = opener
79 self.opener = opener
80 self.after = after
80 self.after = after
81 self.onclose = onclose
81 self.onclose = onclose
82 self.onabort = onabort
82 self.onabort = onabort
83 self.entries = []
83 self.entries = []
84 self.backupentries = []
84 self.backupentries = []
85 self.map = {}
85 self.map = {}
86 self.backupmap = {}
86 self.backupmap = {}
87 self.journal = journal
87 self.journal = journal
88 self._queue = []
88 self._queue = []
89 # a dict of arguments to be passed to hooks
89 # a dict of arguments to be passed to hooks
90 self.hookargs = {}
90 self.hookargs = {}
91
91
92 self.backupjournal = "%s.backupfiles" % journal
92 self.backupjournal = "%s.backupfiles" % journal
93 self.file = opener.open(self.journal, "w")
93 self.file = opener.open(self.journal, "w")
94 self.backupsfile = opener.open(self.backupjournal, 'w')
94 self.backupsfile = opener.open(self.backupjournal, 'w')
95 if createmode is not None:
95 if createmode is not None:
96 opener.chmod(self.journal, createmode & 0666)
96 opener.chmod(self.journal, createmode & 0666)
97 opener.chmod(self.backupjournal, createmode & 0666)
97 opener.chmod(self.backupjournal, createmode & 0666)
98
98
99 def __del__(self):
99 def __del__(self):
100 if self.journal:
100 if self.journal:
101 self._abort()
101 self._abort()
102
102
103 @active
103 @active
104 def startgroup(self):
104 def startgroup(self):
105 self._queue.append(([], []))
105 self._queue.append(([], []))
106
106
107 @active
107 @active
108 def endgroup(self):
108 def endgroup(self):
109 q = self._queue.pop()
109 q = self._queue.pop()
110 self.entries.extend(q[0])
110 self.entries.extend(q[0])
111 self.backupentries.extend(q[1])
111 self.backupentries.extend(q[1])
112
112
113 offsets = []
113 offsets = []
114 backups = []
114 backups = []
115 for f, o, _ in q[0]:
115 for f, o, _ in q[0]:
116 offsets.append((f, o))
116 offsets.append((f, o))
117
117
118 for f, b, _ in q[1]:
118 for f, b, _ in q[1]:
119 backups.append((f, b))
119 backups.append((f, b))
120
120
121 d = ''.join(['%s\0%d\n' % (f, o) for f, o in offsets])
121 d = ''.join(['%s\0%d\n' % (f, o) for f, o in offsets])
122 self.file.write(d)
122 self.file.write(d)
123 self.file.flush()
123 self.file.flush()
124
124
125 d = ''.join(['%s\0%s\0' % (f, b) for f, b in backups])
125 d = ''.join(['%s\0%s\0' % (f, b) for f, b in backups])
126 self.backupsfile.write(d)
126 self.backupsfile.write(d)
127 self.backupsfile.flush()
127 self.backupsfile.flush()
128
128
129 @active
129 @active
130 def add(self, file, offset, data=None):
130 def add(self, file, offset, data=None):
131 if file in self.map or file in self.backupmap:
131 if file in self.map or file in self.backupmap:
132 return
132 return
133 if self._queue:
133 if self._queue:
134 self._queue[-1][0].append((file, offset, data))
134 self._queue[-1][0].append((file, offset, data))
135 return
135 return
136
136
137 self.entries.append((file, offset, data))
137 self.entries.append((file, offset, data))
138 self.map[file] = len(self.entries) - 1
138 self.map[file] = len(self.entries) - 1
139 # add enough data to the journal to do the truncate
139 # add enough data to the journal to do the truncate
140 self.file.write("%s\0%d\n" % (file, offset))
140 self.file.write("%s\0%d\n" % (file, offset))
141 self.file.flush()
141 self.file.flush()
142
142
143 @active
143 @active
144 def addbackup(self, file, hardlink=True):
144 def addbackup(self, file, hardlink=True):
145 """Adds a backup of the file to the transaction
145 """Adds a backup of the file to the transaction
146
146
147 Calling addbackup() creates a hardlink backup of the specified file
147 Calling addbackup() creates a hardlink backup of the specified file
148 that is used to recover the file in the event of the transaction
148 that is used to recover the file in the event of the transaction
149 aborting.
149 aborting.
150
150
151 * `file`: the file path, relative to .hg/store
151 * `file`: the file path, relative to .hg/store
152 * `hardlink`: use a hardlink to quickly create the backup
152 * `hardlink`: use a hardlink to quickly create the backup
153 """
153 """
154
154
155 if file in self.map or file in self.backupmap:
155 if file in self.map or file in self.backupmap:
156 return
156 return
157 backupfile = "journal.%s" % file
157 backupfile = "journal.%s" % file
158 if self.opener.exists(file):
158 if self.opener.exists(file):
159 filepath = self.opener.join(file)
159 filepath = self.opener.join(file)
160 backuppath = self.opener.join(backupfile)
160 backuppath = self.opener.join(backupfile)
161 util.copyfiles(filepath, backuppath, hardlink=hardlink)
161 util.copyfiles(filepath, backuppath, hardlink=hardlink)
162 else:
162 else:
163 self.add(file, 0)
163 self.add(file, 0)
164 return
164 return
165
165
166 if self._queue:
166 if self._queue:
167 self._queue[-1][1].append((file, backupfile))
167 self._queue[-1][1].append((file, backupfile))
168 return
168 return
169
169
170 self.backupentries.append((file, backupfile, None))
170 self.backupentries.append((file, backupfile, None))
171 self.backupmap[file] = len(self.backupentries) - 1
171 self.backupmap[file] = len(self.backupentries) - 1
172 self.backupsfile.write("%s\0%s\0" % (file, backupfile))
172 self.backupsfile.write("%s\0%s\0" % (file, backupfile))
173 self.backupsfile.flush()
173 self.backupsfile.flush()
174
174
175 @active
175 @active
176 def find(self, file):
176 def find(self, file):
177 if file in self.map:
177 if file in self.map:
178 return self.entries[self.map[file]]
178 return self.entries[self.map[file]]
179 if file in self.backupmap:
179 if file in self.backupmap:
180 return self.backupentries[self.backupmap[file]]
180 return self.backupentries[self.backupmap[file]]
181 return None
181 return None
182
182
183 @active
183 @active
184 def replace(self, file, offset, data=None):
184 def replace(self, file, offset, data=None):
185 '''
185 '''
186 replace can only replace already committed entries
186 replace can only replace already committed entries
187 that are not pending in the queue
187 that are not pending in the queue
188 '''
188 '''
189
189
190 if file not in self.map:
190 if file not in self.map:
191 raise KeyError(file)
191 raise KeyError(file)
192 index = self.map[file]
192 index = self.map[file]
193 self.entries[index] = (file, offset, data)
193 self.entries[index] = (file, offset, data)
194 self.file.write("%s\0%d\n" % (file, offset))
194 self.file.write("%s\0%d\n" % (file, offset))
195 self.file.flush()
195 self.file.flush()
196
196
197 @active
197 @active
198 def nest(self):
198 def nest(self):
199 self.count += 1
199 self.count += 1
200 self.usages += 1
200 self.usages += 1
201 return self
201 return self
202
202
203 def release(self):
203 def release(self):
204 if self.count > 0:
204 if self.count > 0:
205 self.usages -= 1
205 self.usages -= 1
206 # if the transaction scopes are left without being closed, fail
206 # if the transaction scopes are left without being closed, fail
207 if self.count > 0 and self.usages == 0:
207 if self.count > 0 and self.usages == 0:
208 self._abort()
208 self._abort()
209
209
210 def running(self):
210 def running(self):
211 return self.count > 0
211 return self.count > 0
212
212
213 @active
213 @active
214 def close(self):
214 def close(self):
215 '''commit the transaction'''
215 '''commit the transaction'''
216 if self.count == 1 and self.onclose is not None:
216 if self.count == 1 and self.onclose is not None:
217 self.onclose()
217 self.onclose()
218
218
219 self.count -= 1
219 self.count -= 1
220 if self.count != 0:
220 if self.count != 0:
221 return
221 return
222 self.file.close()
222 self.file.close()
223 self.backupsfile.close()
223 self.entries = []
224 self.entries = []
224 if self.after:
225 if self.after:
225 self.after()
226 self.after()
226 if self.opener.isfile(self.journal):
227 if self.opener.isfile(self.journal):
227 self.opener.unlink(self.journal)
228 self.opener.unlink(self.journal)
228 if self.opener.isfile(self.backupjournal):
229 if self.opener.isfile(self.backupjournal):
229 self.opener.unlink(self.backupjournal)
230 self.opener.unlink(self.backupjournal)
230 for f, b, _ in self.backupentries:
231 for f, b, _ in self.backupentries:
231 self.opener.unlink(b)
232 self.opener.unlink(b)
232 self.backupentries = []
233 self.backupentries = []
233 self.journal = None
234 self.journal = None
234
235
235 @active
236 @active
236 def abort(self):
237 def abort(self):
237 '''abort the transaction (generally called on error, or when the
238 '''abort the transaction (generally called on error, or when the
238 transaction is not explicitly committed before going out of
239 transaction is not explicitly committed before going out of
239 scope)'''
240 scope)'''
240 self._abort()
241 self._abort()
241
242
242 def _abort(self):
243 def _abort(self):
243 self.count = 0
244 self.count = 0
244 self.usages = 0
245 self.usages = 0
245 self.file.close()
246 self.file.close()
247 self.backupsfile.close()
246
248
247 if self.onabort is not None:
249 if self.onabort is not None:
248 self.onabort()
250 self.onabort()
249
251
250 try:
252 try:
251 if not self.entries and not self.backupentries:
253 if not self.entries and not self.backupentries:
252 if self.journal:
254 if self.journal:
253 self.opener.unlink(self.journal)
255 self.opener.unlink(self.journal)
254 if self.backupjournal:
256 if self.backupjournal:
255 self.opener.unlink(self.backupjournal)
257 self.opener.unlink(self.backupjournal)
256 return
258 return
257
259
258 self.report(_("transaction abort!\n"))
260 self.report(_("transaction abort!\n"))
259
261
260 try:
262 try:
261 _playback(self.journal, self.report, self.opener,
263 _playback(self.journal, self.report, self.opener,
262 self.entries, self.backupentries, False)
264 self.entries, self.backupentries, False)
263 self.report(_("rollback completed\n"))
265 self.report(_("rollback completed\n"))
264 except Exception:
266 except Exception:
265 self.report(_("rollback failed - please run hg recover\n"))
267 self.report(_("rollback failed - please run hg recover\n"))
266 finally:
268 finally:
267 self.journal = None
269 self.journal = None
268
270
269
271
270 def rollback(opener, file, report):
272 def rollback(opener, file, report):
271 """Rolls back the transaction contained in the given file
273 """Rolls back the transaction contained in the given file
272
274
273 Reads the entries in the specified file, and the corresponding
275 Reads the entries in the specified file, and the corresponding
274 '*.backupfiles' file, to recover from an incomplete transaction.
276 '*.backupfiles' file, to recover from an incomplete transaction.
275
277
276 * `file`: a file containing a list of entries, specifying where
278 * `file`: a file containing a list of entries, specifying where
277 to truncate each file. The file should contain a list of
279 to truncate each file. The file should contain a list of
278 file\0offset pairs, delimited by newlines. The corresponding
280 file\0offset pairs, delimited by newlines. The corresponding
279 '*.backupfiles' file should contain a list of file\0backupfile
281 '*.backupfiles' file should contain a list of file\0backupfile
280 pairs, delimited by \0.
282 pairs, delimited by \0.
281 """
283 """
282 entries = []
284 entries = []
283 backupentries = []
285 backupentries = []
284
286
285 fp = opener.open(file)
287 fp = opener.open(file)
286 lines = fp.readlines()
288 lines = fp.readlines()
287 fp.close()
289 fp.close()
288 for l in lines:
290 for l in lines:
289 try:
291 try:
290 f, o = l.split('\0')
292 f, o = l.split('\0')
291 entries.append((f, int(o), None))
293 entries.append((f, int(o), None))
292 except ValueError:
294 except ValueError:
293 report(_("couldn't read journal entry %r!\n") % l)
295 report(_("couldn't read journal entry %r!\n") % l)
294
296
295 backupjournal = "%s.backupfiles" % file
297 backupjournal = "%s.backupfiles" % file
296 if opener.exists(backupjournal):
298 if opener.exists(backupjournal):
297 fp = opener.open(backupjournal)
299 fp = opener.open(backupjournal)
298 data = fp.read()
300 data = fp.read()
299 if len(data) > 0:
301 if len(data) > 0:
300 parts = data.split('\0')
302 parts = data.split('\0')
301 for i in xrange(0, len(parts), 2):
303 for i in xrange(0, len(parts), 2):
302 f, b = parts[i:i + 1]
304 f, b = parts[i:i + 1]
303 backupentries.append((f, b, None))
305 backupentries.append((f, b, None))
304
306
305 _playback(file, report, opener, entries, backupentries)
307 _playback(file, report, opener, entries, backupentries)
General Comments 0
You need to be logged in to leave comments. Login now