##// END OF EJS Templates
remotefilelog: use progress helper in basestore...
Martin von Zweigbergk -
r40876:ad21fbcb default
parent child Browse files
Show More
@@ -1,423 +1,425 b''
1 from __future__ import absolute_import
1 from __future__ import absolute_import
2
2
3 import errno
3 import errno
4 import hashlib
4 import hashlib
5 import os
5 import os
6 import shutil
6 import shutil
7 import stat
7 import stat
8 import time
8 import time
9
9
10 from mercurial.i18n import _
10 from mercurial.i18n import _
11 from mercurial.node import bin, hex
11 from mercurial.node import bin, hex
12 from mercurial import (
12 from mercurial import (
13 error,
13 error,
14 pycompat,
14 pycompat,
15 util,
15 util,
16 )
16 )
17 from . import (
17 from . import (
18 constants,
18 constants,
19 shallowutil,
19 shallowutil,
20 )
20 )
21
21
22 class basestore(object):
22 class basestore(object):
23 def __init__(self, repo, path, reponame, shared=False):
23 def __init__(self, repo, path, reponame, shared=False):
24 """Creates a remotefilelog store object for the given repo name.
24 """Creates a remotefilelog store object for the given repo name.
25
25
26 `path` - The file path where this store keeps its data
26 `path` - The file path where this store keeps its data
27 `reponame` - The name of the repo. This is used to partition data from
27 `reponame` - The name of the repo. This is used to partition data from
28 many repos.
28 many repos.
29 `shared` - True if this store is a shared cache of data from the central
29 `shared` - True if this store is a shared cache of data from the central
30 server, for many repos on this machine. False means this store is for
30 server, for many repos on this machine. False means this store is for
31 the local data for one repo.
31 the local data for one repo.
32 """
32 """
33 self.repo = repo
33 self.repo = repo
34 self.ui = repo.ui
34 self.ui = repo.ui
35 self._path = path
35 self._path = path
36 self._reponame = reponame
36 self._reponame = reponame
37 self._shared = shared
37 self._shared = shared
38 self._uid = os.getuid() if not pycompat.iswindows else None
38 self._uid = os.getuid() if not pycompat.iswindows else None
39
39
40 self._validatecachelog = self.ui.config("remotefilelog",
40 self._validatecachelog = self.ui.config("remotefilelog",
41 "validatecachelog")
41 "validatecachelog")
42 self._validatecache = self.ui.config("remotefilelog", "validatecache",
42 self._validatecache = self.ui.config("remotefilelog", "validatecache",
43 'on')
43 'on')
44 if self._validatecache not in ('on', 'strict', 'off'):
44 if self._validatecache not in ('on', 'strict', 'off'):
45 self._validatecache = 'on'
45 self._validatecache = 'on'
46 if self._validatecache == 'off':
46 if self._validatecache == 'off':
47 self._validatecache = False
47 self._validatecache = False
48
48
49 if shared:
49 if shared:
50 shallowutil.mkstickygroupdir(self.ui, path)
50 shallowutil.mkstickygroupdir(self.ui, path)
51
51
52 def getmissing(self, keys):
52 def getmissing(self, keys):
53 missing = []
53 missing = []
54 for name, node in keys:
54 for name, node in keys:
55 filepath = self._getfilepath(name, node)
55 filepath = self._getfilepath(name, node)
56 exists = os.path.exists(filepath)
56 exists = os.path.exists(filepath)
57 if (exists and self._validatecache == 'strict' and
57 if (exists and self._validatecache == 'strict' and
58 not self._validatekey(filepath, 'contains')):
58 not self._validatekey(filepath, 'contains')):
59 exists = False
59 exists = False
60 if not exists:
60 if not exists:
61 missing.append((name, node))
61 missing.append((name, node))
62
62
63 return missing
63 return missing
64
64
65 # BELOW THIS ARE IMPLEMENTATIONS OF REPACK SOURCE
65 # BELOW THIS ARE IMPLEMENTATIONS OF REPACK SOURCE
66
66
67 def markledger(self, ledger, options=None):
67 def markledger(self, ledger, options=None):
68 if options and options.get(constants.OPTION_PACKSONLY):
68 if options and options.get(constants.OPTION_PACKSONLY):
69 return
69 return
70 if self._shared:
70 if self._shared:
71 for filename, nodes in self._getfiles():
71 for filename, nodes in self._getfiles():
72 for node in nodes:
72 for node in nodes:
73 ledger.markdataentry(self, filename, node)
73 ledger.markdataentry(self, filename, node)
74 ledger.markhistoryentry(self, filename, node)
74 ledger.markhistoryentry(self, filename, node)
75
75
76 def cleanup(self, ledger):
76 def cleanup(self, ledger):
77 ui = self.ui
77 ui = self.ui
78 entries = ledger.sources.get(self, [])
78 entries = ledger.sources.get(self, [])
79 count = 0
79 count = 0
80 progress = ui.makeprogress(_("cleaning up"), unit="files",
81 total=len(entries))
80 for entry in entries:
82 for entry in entries:
81 if entry.gced or (entry.datarepacked and entry.historyrepacked):
83 if entry.gced or (entry.datarepacked and entry.historyrepacked):
82 ui.progress(_("cleaning up"), count, unit="files",
84 progress.update(count)
83 total=len(entries))
84 path = self._getfilepath(entry.filename, entry.node)
85 path = self._getfilepath(entry.filename, entry.node)
85 util.tryunlink(path)
86 util.tryunlink(path)
86 count += 1
87 count += 1
87 ui.progress(_("cleaning up"), None)
88 progress.complete()
88
89
89 # Clean up the repo cache directory.
90 # Clean up the repo cache directory.
90 self._cleanupdirectory(self._getrepocachepath())
91 self._cleanupdirectory(self._getrepocachepath())
91
92
92 # BELOW THIS ARE NON-STANDARD APIS
93 # BELOW THIS ARE NON-STANDARD APIS
93
94
94 def _cleanupdirectory(self, rootdir):
95 def _cleanupdirectory(self, rootdir):
95 """Removes the empty directories and unnecessary files within the root
96 """Removes the empty directories and unnecessary files within the root
96 directory recursively. Note that this method does not remove the root
97 directory recursively. Note that this method does not remove the root
97 directory itself. """
98 directory itself. """
98
99
99 oldfiles = set()
100 oldfiles = set()
100 otherfiles = set()
101 otherfiles = set()
101 # osutil.listdir returns stat information which saves some rmdir/listdir
102 # osutil.listdir returns stat information which saves some rmdir/listdir
102 # syscalls.
103 # syscalls.
103 for name, mode in util.osutil.listdir(rootdir):
104 for name, mode in util.osutil.listdir(rootdir):
104 if stat.S_ISDIR(mode):
105 if stat.S_ISDIR(mode):
105 dirpath = os.path.join(rootdir, name)
106 dirpath = os.path.join(rootdir, name)
106 self._cleanupdirectory(dirpath)
107 self._cleanupdirectory(dirpath)
107
108
108 # Now that the directory specified by dirpath is potentially
109 # Now that the directory specified by dirpath is potentially
109 # empty, try and remove it.
110 # empty, try and remove it.
110 try:
111 try:
111 os.rmdir(dirpath)
112 os.rmdir(dirpath)
112 except OSError:
113 except OSError:
113 pass
114 pass
114
115
115 elif stat.S_ISREG(mode):
116 elif stat.S_ISREG(mode):
116 if name.endswith('_old'):
117 if name.endswith('_old'):
117 oldfiles.add(name[:-4])
118 oldfiles.add(name[:-4])
118 else:
119 else:
119 otherfiles.add(name)
120 otherfiles.add(name)
120
121
121 # Remove the files which end with suffix '_old' and have no
122 # Remove the files which end with suffix '_old' and have no
122 # corresponding file without the suffix '_old'. See addremotefilelognode
123 # corresponding file without the suffix '_old'. See addremotefilelognode
123 # method for the generation/purpose of files with '_old' suffix.
124 # method for the generation/purpose of files with '_old' suffix.
124 for filename in oldfiles - otherfiles:
125 for filename in oldfiles - otherfiles:
125 filepath = os.path.join(rootdir, filename + '_old')
126 filepath = os.path.join(rootdir, filename + '_old')
126 util.tryunlink(filepath)
127 util.tryunlink(filepath)
127
128
128 def _getfiles(self):
129 def _getfiles(self):
129 """Return a list of (filename, [node,...]) for all the revisions that
130 """Return a list of (filename, [node,...]) for all the revisions that
130 exist in the store.
131 exist in the store.
131
132
132 This is useful for obtaining a list of all the contents of the store
133 This is useful for obtaining a list of all the contents of the store
133 when performing a repack to another store, since the store API requires
134 when performing a repack to another store, since the store API requires
134 name+node keys and not namehash+node keys.
135 name+node keys and not namehash+node keys.
135 """
136 """
136 existing = {}
137 existing = {}
137 for filenamehash, node in self._listkeys():
138 for filenamehash, node in self._listkeys():
138 existing.setdefault(filenamehash, []).append(node)
139 existing.setdefault(filenamehash, []).append(node)
139
140
140 filenamemap = self._resolvefilenames(existing.keys())
141 filenamemap = self._resolvefilenames(existing.keys())
141
142
142 for filename, sha in filenamemap.iteritems():
143 for filename, sha in filenamemap.iteritems():
143 yield (filename, existing[sha])
144 yield (filename, existing[sha])
144
145
145 def _resolvefilenames(self, hashes):
146 def _resolvefilenames(self, hashes):
146 """Given a list of filename hashes that are present in the
147 """Given a list of filename hashes that are present in the
147 remotefilelog store, return a mapping from filename->hash.
148 remotefilelog store, return a mapping from filename->hash.
148
149
149 This is useful when converting remotefilelog blobs into other storage
150 This is useful when converting remotefilelog blobs into other storage
150 formats.
151 formats.
151 """
152 """
152 if not hashes:
153 if not hashes:
153 return {}
154 return {}
154
155
155 filenames = {}
156 filenames = {}
156 missingfilename = set(hashes)
157 missingfilename = set(hashes)
157
158
158 # Start with a full manifest, since it'll cover the majority of files
159 # Start with a full manifest, since it'll cover the majority of files
159 for filename in self.repo['tip'].manifest():
160 for filename in self.repo['tip'].manifest():
160 sha = hashlib.sha1(filename).digest()
161 sha = hashlib.sha1(filename).digest()
161 if sha in missingfilename:
162 if sha in missingfilename:
162 filenames[filename] = sha
163 filenames[filename] = sha
163 missingfilename.discard(sha)
164 missingfilename.discard(sha)
164
165
165 # Scan the changelog until we've found every file name
166 # Scan the changelog until we've found every file name
166 cl = self.repo.unfiltered().changelog
167 cl = self.repo.unfiltered().changelog
167 for rev in pycompat.xrange(len(cl) - 1, -1, -1):
168 for rev in pycompat.xrange(len(cl) - 1, -1, -1):
168 if not missingfilename:
169 if not missingfilename:
169 break
170 break
170 files = cl.readfiles(cl.node(rev))
171 files = cl.readfiles(cl.node(rev))
171 for filename in files:
172 for filename in files:
172 sha = hashlib.sha1(filename).digest()
173 sha = hashlib.sha1(filename).digest()
173 if sha in missingfilename:
174 if sha in missingfilename:
174 filenames[filename] = sha
175 filenames[filename] = sha
175 missingfilename.discard(sha)
176 missingfilename.discard(sha)
176
177
177 return filenames
178 return filenames
178
179
179 def _getrepocachepath(self):
180 def _getrepocachepath(self):
180 return os.path.join(
181 return os.path.join(
181 self._path, self._reponame) if self._shared else self._path
182 self._path, self._reponame) if self._shared else self._path
182
183
183 def _listkeys(self):
184 def _listkeys(self):
184 """List all the remotefilelog keys that exist in the store.
185 """List all the remotefilelog keys that exist in the store.
185
186
186 Returns a iterator of (filename hash, filecontent hash) tuples.
187 Returns a iterator of (filename hash, filecontent hash) tuples.
187 """
188 """
188
189
189 for root, dirs, files in os.walk(self._getrepocachepath()):
190 for root, dirs, files in os.walk(self._getrepocachepath()):
190 for filename in files:
191 for filename in files:
191 if len(filename) != 40:
192 if len(filename) != 40:
192 continue
193 continue
193 node = filename
194 node = filename
194 if self._shared:
195 if self._shared:
195 # .../1a/85ffda..be21
196 # .../1a/85ffda..be21
196 filenamehash = root[-41:-39] + root[-38:]
197 filenamehash = root[-41:-39] + root[-38:]
197 else:
198 else:
198 filenamehash = root[-40:]
199 filenamehash = root[-40:]
199 yield (bin(filenamehash), bin(node))
200 yield (bin(filenamehash), bin(node))
200
201
201 def _getfilepath(self, name, node):
202 def _getfilepath(self, name, node):
202 node = hex(node)
203 node = hex(node)
203 if self._shared:
204 if self._shared:
204 key = shallowutil.getcachekey(self._reponame, name, node)
205 key = shallowutil.getcachekey(self._reponame, name, node)
205 else:
206 else:
206 key = shallowutil.getlocalkey(name, node)
207 key = shallowutil.getlocalkey(name, node)
207
208
208 return os.path.join(self._path, key)
209 return os.path.join(self._path, key)
209
210
210 def _getdata(self, name, node):
211 def _getdata(self, name, node):
211 filepath = self._getfilepath(name, node)
212 filepath = self._getfilepath(name, node)
212 try:
213 try:
213 data = shallowutil.readfile(filepath)
214 data = shallowutil.readfile(filepath)
214 if self._validatecache and not self._validatedata(data, filepath):
215 if self._validatecache and not self._validatedata(data, filepath):
215 if self._validatecachelog:
216 if self._validatecachelog:
216 with open(self._validatecachelog, 'a+') as f:
217 with open(self._validatecachelog, 'a+') as f:
217 f.write("corrupt %s during read\n" % filepath)
218 f.write("corrupt %s during read\n" % filepath)
218 os.rename(filepath, filepath + ".corrupt")
219 os.rename(filepath, filepath + ".corrupt")
219 raise KeyError("corrupt local cache file %s" % filepath)
220 raise KeyError("corrupt local cache file %s" % filepath)
220 except IOError:
221 except IOError:
221 raise KeyError("no file found at %s for %s:%s" % (filepath, name,
222 raise KeyError("no file found at %s for %s:%s" % (filepath, name,
222 hex(node)))
223 hex(node)))
223
224
224 return data
225 return data
225
226
226 def addremotefilelognode(self, name, node, data):
227 def addremotefilelognode(self, name, node, data):
227 filepath = self._getfilepath(name, node)
228 filepath = self._getfilepath(name, node)
228
229
229 oldumask = os.umask(0o002)
230 oldumask = os.umask(0o002)
230 try:
231 try:
231 # if this node already exists, save the old version for
232 # if this node already exists, save the old version for
232 # recovery/debugging purposes.
233 # recovery/debugging purposes.
233 if os.path.exists(filepath):
234 if os.path.exists(filepath):
234 newfilename = filepath + '_old'
235 newfilename = filepath + '_old'
235 # newfilename can be read-only and shutil.copy will fail.
236 # newfilename can be read-only and shutil.copy will fail.
236 # Delete newfilename to avoid it
237 # Delete newfilename to avoid it
237 if os.path.exists(newfilename):
238 if os.path.exists(newfilename):
238 shallowutil.unlinkfile(newfilename)
239 shallowutil.unlinkfile(newfilename)
239 shutil.copy(filepath, newfilename)
240 shutil.copy(filepath, newfilename)
240
241
241 shallowutil.mkstickygroupdir(self.ui, os.path.dirname(filepath))
242 shallowutil.mkstickygroupdir(self.ui, os.path.dirname(filepath))
242 shallowutil.writefile(filepath, data, readonly=True)
243 shallowutil.writefile(filepath, data, readonly=True)
243
244
244 if self._validatecache:
245 if self._validatecache:
245 if not self._validatekey(filepath, 'write'):
246 if not self._validatekey(filepath, 'write'):
246 raise error.Abort(_("local cache write was corrupted %s") %
247 raise error.Abort(_("local cache write was corrupted %s") %
247 filepath)
248 filepath)
248 finally:
249 finally:
249 os.umask(oldumask)
250 os.umask(oldumask)
250
251
251 def markrepo(self, path):
252 def markrepo(self, path):
252 """Call this to add the given repo path to the store's list of
253 """Call this to add the given repo path to the store's list of
253 repositories that are using it. This is useful later when doing garbage
254 repositories that are using it. This is useful later when doing garbage
254 collection, since it allows us to insecpt the repos to see what nodes
255 collection, since it allows us to insecpt the repos to see what nodes
255 they want to be kept alive in the store.
256 they want to be kept alive in the store.
256 """
257 """
257 repospath = os.path.join(self._path, "repos")
258 repospath = os.path.join(self._path, "repos")
258 with open(repospath, 'ab') as reposfile:
259 with open(repospath, 'ab') as reposfile:
259 reposfile.write(os.path.dirname(path) + "\n")
260 reposfile.write(os.path.dirname(path) + "\n")
260
261
261 repospathstat = os.stat(repospath)
262 repospathstat = os.stat(repospath)
262 if repospathstat.st_uid == self._uid:
263 if repospathstat.st_uid == self._uid:
263 os.chmod(repospath, 0o0664)
264 os.chmod(repospath, 0o0664)
264
265
265 def _validatekey(self, path, action):
266 def _validatekey(self, path, action):
266 with open(path, 'rb') as f:
267 with open(path, 'rb') as f:
267 data = f.read()
268 data = f.read()
268
269
269 if self._validatedata(data, path):
270 if self._validatedata(data, path):
270 return True
271 return True
271
272
272 if self._validatecachelog:
273 if self._validatecachelog:
273 with open(self._validatecachelog, 'ab+') as f:
274 with open(self._validatecachelog, 'ab+') as f:
274 f.write("corrupt %s during %s\n" % (path, action))
275 f.write("corrupt %s during %s\n" % (path, action))
275
276
276 os.rename(path, path + ".corrupt")
277 os.rename(path, path + ".corrupt")
277 return False
278 return False
278
279
279 def _validatedata(self, data, path):
280 def _validatedata(self, data, path):
280 try:
281 try:
281 if len(data) > 0:
282 if len(data) > 0:
282 # see remotefilelogserver.createfileblob for the format
283 # see remotefilelogserver.createfileblob for the format
283 offset, size, flags = shallowutil.parsesizeflags(data)
284 offset, size, flags = shallowutil.parsesizeflags(data)
284 if len(data) <= size:
285 if len(data) <= size:
285 # it is truncated
286 # it is truncated
286 return False
287 return False
287
288
288 # extract the node from the metadata
289 # extract the node from the metadata
289 offset += size
290 offset += size
290 datanode = data[offset:offset + 20]
291 datanode = data[offset:offset + 20]
291
292
292 # and compare against the path
293 # and compare against the path
293 if os.path.basename(path) == hex(datanode):
294 if os.path.basename(path) == hex(datanode):
294 # Content matches the intended path
295 # Content matches the intended path
295 return True
296 return True
296 return False
297 return False
297 except (ValueError, RuntimeError):
298 except (ValueError, RuntimeError):
298 pass
299 pass
299
300
300 return False
301 return False
301
302
302 def gc(self, keepkeys):
303 def gc(self, keepkeys):
303 ui = self.ui
304 ui = self.ui
304 cachepath = self._path
305 cachepath = self._path
305 _removing = _("removing unnecessary files")
306 _truncating = _("enforcing cache limit")
307
306
308 # prune cache
307 # prune cache
309 import Queue
308 import Queue
310 queue = Queue.PriorityQueue()
309 queue = Queue.PriorityQueue()
311 originalsize = 0
310 originalsize = 0
312 size = 0
311 size = 0
313 count = 0
312 count = 0
314 removed = 0
313 removed = 0
315
314
316 # keep files newer than a day even if they aren't needed
315 # keep files newer than a day even if they aren't needed
317 limit = time.time() - (60 * 60 * 24)
316 limit = time.time() - (60 * 60 * 24)
318
317
319 ui.progress(_removing, count, unit="files")
318 progress = ui.makeprogress(_("removing unnecessary files"),
319 unit="files")
320 progress.update(0)
320 for root, dirs, files in os.walk(cachepath):
321 for root, dirs, files in os.walk(cachepath):
321 for file in files:
322 for file in files:
322 if file == 'repos':
323 if file == 'repos':
323 continue
324 continue
324
325
325 # Don't delete pack files
326 # Don't delete pack files
326 if '/packs/' in root:
327 if '/packs/' in root:
327 continue
328 continue
328
329
329 ui.progress(_removing, count, unit="files")
330 progress.update(count)
330 path = os.path.join(root, file)
331 path = os.path.join(root, file)
331 key = os.path.relpath(path, cachepath)
332 key = os.path.relpath(path, cachepath)
332 count += 1
333 count += 1
333 try:
334 try:
334 pathstat = os.stat(path)
335 pathstat = os.stat(path)
335 except OSError as e:
336 except OSError as e:
336 # errno.ENOENT = no such file or directory
337 # errno.ENOENT = no such file or directory
337 if e.errno != errno.ENOENT:
338 if e.errno != errno.ENOENT:
338 raise
339 raise
339 msg = _("warning: file %s was removed by another process\n")
340 msg = _("warning: file %s was removed by another process\n")
340 ui.warn(msg % path)
341 ui.warn(msg % path)
341 continue
342 continue
342
343
343 originalsize += pathstat.st_size
344 originalsize += pathstat.st_size
344
345
345 if key in keepkeys or pathstat.st_atime > limit:
346 if key in keepkeys or pathstat.st_atime > limit:
346 queue.put((pathstat.st_atime, path, pathstat))
347 queue.put((pathstat.st_atime, path, pathstat))
347 size += pathstat.st_size
348 size += pathstat.st_size
348 else:
349 else:
349 try:
350 try:
350 shallowutil.unlinkfile(path)
351 shallowutil.unlinkfile(path)
351 except OSError as e:
352 except OSError as e:
352 # errno.ENOENT = no such file or directory
353 # errno.ENOENT = no such file or directory
353 if e.errno != errno.ENOENT:
354 if e.errno != errno.ENOENT:
354 raise
355 raise
355 msg = _("warning: file %s was removed by another "
356 msg = _("warning: file %s was removed by another "
356 "process\n")
357 "process\n")
357 ui.warn(msg % path)
358 ui.warn(msg % path)
358 continue
359 continue
359 removed += 1
360 removed += 1
360 ui.progress(_removing, None)
361 progress.complete()
361
362
362 # remove oldest files until under limit
363 # remove oldest files until under limit
363 limit = ui.configbytes("remotefilelog", "cachelimit")
364 limit = ui.configbytes("remotefilelog", "cachelimit")
364 if size > limit:
365 if size > limit:
365 excess = size - limit
366 excess = size - limit
367 progress = ui.makeprogress(_("enforcing cache limit"), unit="bytes",
368 total=excess)
366 removedexcess = 0
369 removedexcess = 0
367 while queue and size > limit and size > 0:
370 while queue and size > limit and size > 0:
368 ui.progress(_truncating, removedexcess, unit="bytes",
371 progress.update(removedexcess)
369 total=excess)
370 atime, oldpath, oldpathstat = queue.get()
372 atime, oldpath, oldpathstat = queue.get()
371 try:
373 try:
372 shallowutil.unlinkfile(oldpath)
374 shallowutil.unlinkfile(oldpath)
373 except OSError as e:
375 except OSError as e:
374 # errno.ENOENT = no such file or directory
376 # errno.ENOENT = no such file or directory
375 if e.errno != errno.ENOENT:
377 if e.errno != errno.ENOENT:
376 raise
378 raise
377 msg = _("warning: file %s was removed by another process\n")
379 msg = _("warning: file %s was removed by another process\n")
378 ui.warn(msg % oldpath)
380 ui.warn(msg % oldpath)
379 size -= oldpathstat.st_size
381 size -= oldpathstat.st_size
380 removed += 1
382 removed += 1
381 removedexcess += oldpathstat.st_size
383 removedexcess += oldpathstat.st_size
382 ui.progress(_truncating, None)
384 progress.complete()
383
385
384 ui.status(_("finished: removed %s of %s files (%0.2f GB to %0.2f GB)\n")
386 ui.status(_("finished: removed %s of %s files (%0.2f GB to %0.2f GB)\n")
385 % (removed, count,
387 % (removed, count,
386 float(originalsize) / 1024.0 / 1024.0 / 1024.0,
388 float(originalsize) / 1024.0 / 1024.0 / 1024.0,
387 float(size) / 1024.0 / 1024.0 / 1024.0))
389 float(size) / 1024.0 / 1024.0 / 1024.0))
388
390
389 class baseunionstore(object):
391 class baseunionstore(object):
390 def __init__(self, *args, **kwargs):
392 def __init__(self, *args, **kwargs):
391 # If one of the functions that iterates all of the stores is about to
393 # If one of the functions that iterates all of the stores is about to
392 # throw a KeyError, try this many times with a full refresh between
394 # throw a KeyError, try this many times with a full refresh between
393 # attempts. A repack operation may have moved data from one store to
395 # attempts. A repack operation may have moved data from one store to
394 # another while we were running.
396 # another while we were running.
395 self.numattempts = kwargs.get(r'numretries', 0) + 1
397 self.numattempts = kwargs.get(r'numretries', 0) + 1
396 # If not-None, call this function on every retry and if the attempts are
398 # If not-None, call this function on every retry and if the attempts are
397 # exhausted.
399 # exhausted.
398 self.retrylog = kwargs.get(r'retrylog', None)
400 self.retrylog = kwargs.get(r'retrylog', None)
399
401
400 def markforrefresh(self):
402 def markforrefresh(self):
401 for store in self.stores:
403 for store in self.stores:
402 if util.safehasattr(store, 'markforrefresh'):
404 if util.safehasattr(store, 'markforrefresh'):
403 store.markforrefresh()
405 store.markforrefresh()
404
406
405 @staticmethod
407 @staticmethod
406 def retriable(fn):
408 def retriable(fn):
407 def noop(*args):
409 def noop(*args):
408 pass
410 pass
409 def wrapped(self, *args, **kwargs):
411 def wrapped(self, *args, **kwargs):
410 retrylog = self.retrylog or noop
412 retrylog = self.retrylog or noop
411 funcname = fn.__name__
413 funcname = fn.__name__
412 for i in pycompat.xrange(self.numattempts):
414 for i in pycompat.xrange(self.numattempts):
413 if i > 0:
415 if i > 0:
414 retrylog('re-attempting (n=%d) %s\n' % (i, funcname))
416 retrylog('re-attempting (n=%d) %s\n' % (i, funcname))
415 self.markforrefresh()
417 self.markforrefresh()
416 try:
418 try:
417 return fn(self, *args, **kwargs)
419 return fn(self, *args, **kwargs)
418 except KeyError:
420 except KeyError:
419 pass
421 pass
420 # retries exhausted
422 # retries exhausted
421 retrylog('retries exhausted in %s, raising KeyError\n' % funcname)
423 retrylog('retries exhausted in %s, raising KeyError\n' % funcname)
422 raise
424 raise
423 return wrapped
425 return wrapped
General Comments 0
You need to be logged in to leave comments. Login now