##// END OF EJS Templates
remotefilelog: use progress helper in basestore...
Martin von Zweigbergk -
r40876:ad21fbcb default
parent child Browse files
Show More
@@ -1,423 +1,425 b''
1 1 from __future__ import absolute_import
2 2
3 3 import errno
4 4 import hashlib
5 5 import os
6 6 import shutil
7 7 import stat
8 8 import time
9 9
10 10 from mercurial.i18n import _
11 11 from mercurial.node import bin, hex
12 12 from mercurial import (
13 13 error,
14 14 pycompat,
15 15 util,
16 16 )
17 17 from . import (
18 18 constants,
19 19 shallowutil,
20 20 )
21 21
22 22 class basestore(object):
23 23 def __init__(self, repo, path, reponame, shared=False):
24 24 """Creates a remotefilelog store object for the given repo name.
25 25
26 26 `path` - The file path where this store keeps its data
27 27 `reponame` - The name of the repo. This is used to partition data from
28 28 many repos.
29 29 `shared` - True if this store is a shared cache of data from the central
30 30 server, for many repos on this machine. False means this store is for
31 31 the local data for one repo.
32 32 """
33 33 self.repo = repo
34 34 self.ui = repo.ui
35 35 self._path = path
36 36 self._reponame = reponame
37 37 self._shared = shared
38 38 self._uid = os.getuid() if not pycompat.iswindows else None
39 39
40 40 self._validatecachelog = self.ui.config("remotefilelog",
41 41 "validatecachelog")
42 42 self._validatecache = self.ui.config("remotefilelog", "validatecache",
43 43 'on')
44 44 if self._validatecache not in ('on', 'strict', 'off'):
45 45 self._validatecache = 'on'
46 46 if self._validatecache == 'off':
47 47 self._validatecache = False
48 48
49 49 if shared:
50 50 shallowutil.mkstickygroupdir(self.ui, path)
51 51
52 52 def getmissing(self, keys):
53 53 missing = []
54 54 for name, node in keys:
55 55 filepath = self._getfilepath(name, node)
56 56 exists = os.path.exists(filepath)
57 57 if (exists and self._validatecache == 'strict' and
58 58 not self._validatekey(filepath, 'contains')):
59 59 exists = False
60 60 if not exists:
61 61 missing.append((name, node))
62 62
63 63 return missing
64 64
65 65 # BELOW THIS ARE IMPLEMENTATIONS OF REPACK SOURCE
66 66
67 67 def markledger(self, ledger, options=None):
68 68 if options and options.get(constants.OPTION_PACKSONLY):
69 69 return
70 70 if self._shared:
71 71 for filename, nodes in self._getfiles():
72 72 for node in nodes:
73 73 ledger.markdataentry(self, filename, node)
74 74 ledger.markhistoryentry(self, filename, node)
75 75
76 76 def cleanup(self, ledger):
77 77 ui = self.ui
78 78 entries = ledger.sources.get(self, [])
79 79 count = 0
80 progress = ui.makeprogress(_("cleaning up"), unit="files",
81 total=len(entries))
80 82 for entry in entries:
81 83 if entry.gced or (entry.datarepacked and entry.historyrepacked):
82 ui.progress(_("cleaning up"), count, unit="files",
83 total=len(entries))
84 progress.update(count)
84 85 path = self._getfilepath(entry.filename, entry.node)
85 86 util.tryunlink(path)
86 87 count += 1
87 ui.progress(_("cleaning up"), None)
88 progress.complete()
88 89
89 90 # Clean up the repo cache directory.
90 91 self._cleanupdirectory(self._getrepocachepath())
91 92
92 93 # BELOW THIS ARE NON-STANDARD APIS
93 94
94 95 def _cleanupdirectory(self, rootdir):
95 96 """Removes the empty directories and unnecessary files within the root
96 97 directory recursively. Note that this method does not remove the root
97 98 directory itself. """
98 99
99 100 oldfiles = set()
100 101 otherfiles = set()
101 102 # osutil.listdir returns stat information which saves some rmdir/listdir
102 103 # syscalls.
103 104 for name, mode in util.osutil.listdir(rootdir):
104 105 if stat.S_ISDIR(mode):
105 106 dirpath = os.path.join(rootdir, name)
106 107 self._cleanupdirectory(dirpath)
107 108
108 109 # Now that the directory specified by dirpath is potentially
109 110 # empty, try and remove it.
110 111 try:
111 112 os.rmdir(dirpath)
112 113 except OSError:
113 114 pass
114 115
115 116 elif stat.S_ISREG(mode):
116 117 if name.endswith('_old'):
117 118 oldfiles.add(name[:-4])
118 119 else:
119 120 otherfiles.add(name)
120 121
121 122 # Remove the files which end with suffix '_old' and have no
122 123 # corresponding file without the suffix '_old'. See addremotefilelognode
123 124 # method for the generation/purpose of files with '_old' suffix.
124 125 for filename in oldfiles - otherfiles:
125 126 filepath = os.path.join(rootdir, filename + '_old')
126 127 util.tryunlink(filepath)
127 128
128 129 def _getfiles(self):
129 130 """Return a list of (filename, [node,...]) for all the revisions that
130 131 exist in the store.
131 132
132 133 This is useful for obtaining a list of all the contents of the store
133 134 when performing a repack to another store, since the store API requires
134 135 name+node keys and not namehash+node keys.
135 136 """
136 137 existing = {}
137 138 for filenamehash, node in self._listkeys():
138 139 existing.setdefault(filenamehash, []).append(node)
139 140
140 141 filenamemap = self._resolvefilenames(existing.keys())
141 142
142 143 for filename, sha in filenamemap.iteritems():
143 144 yield (filename, existing[sha])
144 145
145 146 def _resolvefilenames(self, hashes):
146 147 """Given a list of filename hashes that are present in the
147 148 remotefilelog store, return a mapping from filename->hash.
148 149
149 150 This is useful when converting remotefilelog blobs into other storage
150 151 formats.
151 152 """
152 153 if not hashes:
153 154 return {}
154 155
155 156 filenames = {}
156 157 missingfilename = set(hashes)
157 158
158 159 # Start with a full manifest, since it'll cover the majority of files
159 160 for filename in self.repo['tip'].manifest():
160 161 sha = hashlib.sha1(filename).digest()
161 162 if sha in missingfilename:
162 163 filenames[filename] = sha
163 164 missingfilename.discard(sha)
164 165
165 166 # Scan the changelog until we've found every file name
166 167 cl = self.repo.unfiltered().changelog
167 168 for rev in pycompat.xrange(len(cl) - 1, -1, -1):
168 169 if not missingfilename:
169 170 break
170 171 files = cl.readfiles(cl.node(rev))
171 172 for filename in files:
172 173 sha = hashlib.sha1(filename).digest()
173 174 if sha in missingfilename:
174 175 filenames[filename] = sha
175 176 missingfilename.discard(sha)
176 177
177 178 return filenames
178 179
179 180 def _getrepocachepath(self):
180 181 return os.path.join(
181 182 self._path, self._reponame) if self._shared else self._path
182 183
183 184 def _listkeys(self):
184 185 """List all the remotefilelog keys that exist in the store.
185 186
186 187 Returns a iterator of (filename hash, filecontent hash) tuples.
187 188 """
188 189
189 190 for root, dirs, files in os.walk(self._getrepocachepath()):
190 191 for filename in files:
191 192 if len(filename) != 40:
192 193 continue
193 194 node = filename
194 195 if self._shared:
195 196 # .../1a/85ffda..be21
196 197 filenamehash = root[-41:-39] + root[-38:]
197 198 else:
198 199 filenamehash = root[-40:]
199 200 yield (bin(filenamehash), bin(node))
200 201
201 202 def _getfilepath(self, name, node):
202 203 node = hex(node)
203 204 if self._shared:
204 205 key = shallowutil.getcachekey(self._reponame, name, node)
205 206 else:
206 207 key = shallowutil.getlocalkey(name, node)
207 208
208 209 return os.path.join(self._path, key)
209 210
210 211 def _getdata(self, name, node):
211 212 filepath = self._getfilepath(name, node)
212 213 try:
213 214 data = shallowutil.readfile(filepath)
214 215 if self._validatecache and not self._validatedata(data, filepath):
215 216 if self._validatecachelog:
216 217 with open(self._validatecachelog, 'a+') as f:
217 218 f.write("corrupt %s during read\n" % filepath)
218 219 os.rename(filepath, filepath + ".corrupt")
219 220 raise KeyError("corrupt local cache file %s" % filepath)
220 221 except IOError:
221 222 raise KeyError("no file found at %s for %s:%s" % (filepath, name,
222 223 hex(node)))
223 224
224 225 return data
225 226
226 227 def addremotefilelognode(self, name, node, data):
227 228 filepath = self._getfilepath(name, node)
228 229
229 230 oldumask = os.umask(0o002)
230 231 try:
231 232 # if this node already exists, save the old version for
232 233 # recovery/debugging purposes.
233 234 if os.path.exists(filepath):
234 235 newfilename = filepath + '_old'
235 236 # newfilename can be read-only and shutil.copy will fail.
236 237 # Delete newfilename to avoid it
237 238 if os.path.exists(newfilename):
238 239 shallowutil.unlinkfile(newfilename)
239 240 shutil.copy(filepath, newfilename)
240 241
241 242 shallowutil.mkstickygroupdir(self.ui, os.path.dirname(filepath))
242 243 shallowutil.writefile(filepath, data, readonly=True)
243 244
244 245 if self._validatecache:
245 246 if not self._validatekey(filepath, 'write'):
246 247 raise error.Abort(_("local cache write was corrupted %s") %
247 248 filepath)
248 249 finally:
249 250 os.umask(oldumask)
250 251
251 252 def markrepo(self, path):
252 253 """Call this to add the given repo path to the store's list of
253 254 repositories that are using it. This is useful later when doing garbage
254 255 collection, since it allows us to insecpt the repos to see what nodes
255 256 they want to be kept alive in the store.
256 257 """
257 258 repospath = os.path.join(self._path, "repos")
258 259 with open(repospath, 'ab') as reposfile:
259 260 reposfile.write(os.path.dirname(path) + "\n")
260 261
261 262 repospathstat = os.stat(repospath)
262 263 if repospathstat.st_uid == self._uid:
263 264 os.chmod(repospath, 0o0664)
264 265
265 266 def _validatekey(self, path, action):
266 267 with open(path, 'rb') as f:
267 268 data = f.read()
268 269
269 270 if self._validatedata(data, path):
270 271 return True
271 272
272 273 if self._validatecachelog:
273 274 with open(self._validatecachelog, 'ab+') as f:
274 275 f.write("corrupt %s during %s\n" % (path, action))
275 276
276 277 os.rename(path, path + ".corrupt")
277 278 return False
278 279
279 280 def _validatedata(self, data, path):
280 281 try:
281 282 if len(data) > 0:
282 283 # see remotefilelogserver.createfileblob for the format
283 284 offset, size, flags = shallowutil.parsesizeflags(data)
284 285 if len(data) <= size:
285 286 # it is truncated
286 287 return False
287 288
288 289 # extract the node from the metadata
289 290 offset += size
290 291 datanode = data[offset:offset + 20]
291 292
292 293 # and compare against the path
293 294 if os.path.basename(path) == hex(datanode):
294 295 # Content matches the intended path
295 296 return True
296 297 return False
297 298 except (ValueError, RuntimeError):
298 299 pass
299 300
300 301 return False
301 302
302 303 def gc(self, keepkeys):
303 304 ui = self.ui
304 305 cachepath = self._path
305 _removing = _("removing unnecessary files")
306 _truncating = _("enforcing cache limit")
307 306
308 307 # prune cache
309 308 import Queue
310 309 queue = Queue.PriorityQueue()
311 310 originalsize = 0
312 311 size = 0
313 312 count = 0
314 313 removed = 0
315 314
316 315 # keep files newer than a day even if they aren't needed
317 316 limit = time.time() - (60 * 60 * 24)
318 317
319 ui.progress(_removing, count, unit="files")
318 progress = ui.makeprogress(_("removing unnecessary files"),
319 unit="files")
320 progress.update(0)
320 321 for root, dirs, files in os.walk(cachepath):
321 322 for file in files:
322 323 if file == 'repos':
323 324 continue
324 325
325 326 # Don't delete pack files
326 327 if '/packs/' in root:
327 328 continue
328 329
329 ui.progress(_removing, count, unit="files")
330 progress.update(count)
330 331 path = os.path.join(root, file)
331 332 key = os.path.relpath(path, cachepath)
332 333 count += 1
333 334 try:
334 335 pathstat = os.stat(path)
335 336 except OSError as e:
336 337 # errno.ENOENT = no such file or directory
337 338 if e.errno != errno.ENOENT:
338 339 raise
339 340 msg = _("warning: file %s was removed by another process\n")
340 341 ui.warn(msg % path)
341 342 continue
342 343
343 344 originalsize += pathstat.st_size
344 345
345 346 if key in keepkeys or pathstat.st_atime > limit:
346 347 queue.put((pathstat.st_atime, path, pathstat))
347 348 size += pathstat.st_size
348 349 else:
349 350 try:
350 351 shallowutil.unlinkfile(path)
351 352 except OSError as e:
352 353 # errno.ENOENT = no such file or directory
353 354 if e.errno != errno.ENOENT:
354 355 raise
355 356 msg = _("warning: file %s was removed by another "
356 357 "process\n")
357 358 ui.warn(msg % path)
358 359 continue
359 360 removed += 1
360 ui.progress(_removing, None)
361 progress.complete()
361 362
362 363 # remove oldest files until under limit
363 364 limit = ui.configbytes("remotefilelog", "cachelimit")
364 365 if size > limit:
365 366 excess = size - limit
367 progress = ui.makeprogress(_("enforcing cache limit"), unit="bytes",
368 total=excess)
366 369 removedexcess = 0
367 370 while queue and size > limit and size > 0:
368 ui.progress(_truncating, removedexcess, unit="bytes",
369 total=excess)
371 progress.update(removedexcess)
370 372 atime, oldpath, oldpathstat = queue.get()
371 373 try:
372 374 shallowutil.unlinkfile(oldpath)
373 375 except OSError as e:
374 376 # errno.ENOENT = no such file or directory
375 377 if e.errno != errno.ENOENT:
376 378 raise
377 379 msg = _("warning: file %s was removed by another process\n")
378 380 ui.warn(msg % oldpath)
379 381 size -= oldpathstat.st_size
380 382 removed += 1
381 383 removedexcess += oldpathstat.st_size
382 ui.progress(_truncating, None)
384 progress.complete()
383 385
384 386 ui.status(_("finished: removed %s of %s files (%0.2f GB to %0.2f GB)\n")
385 387 % (removed, count,
386 388 float(originalsize) / 1024.0 / 1024.0 / 1024.0,
387 389 float(size) / 1024.0 / 1024.0 / 1024.0))
388 390
389 391 class baseunionstore(object):
390 392 def __init__(self, *args, **kwargs):
391 393 # If one of the functions that iterates all of the stores is about to
392 394 # throw a KeyError, try this many times with a full refresh between
393 395 # attempts. A repack operation may have moved data from one store to
394 396 # another while we were running.
395 397 self.numattempts = kwargs.get(r'numretries', 0) + 1
396 398 # If not-None, call this function on every retry and if the attempts are
397 399 # exhausted.
398 400 self.retrylog = kwargs.get(r'retrylog', None)
399 401
400 402 def markforrefresh(self):
401 403 for store in self.stores:
402 404 if util.safehasattr(store, 'markforrefresh'):
403 405 store.markforrefresh()
404 406
405 407 @staticmethod
406 408 def retriable(fn):
407 409 def noop(*args):
408 410 pass
409 411 def wrapped(self, *args, **kwargs):
410 412 retrylog = self.retrylog or noop
411 413 funcname = fn.__name__
412 414 for i in pycompat.xrange(self.numattempts):
413 415 if i > 0:
414 416 retrylog('re-attempting (n=%d) %s\n' % (i, funcname))
415 417 self.markforrefresh()
416 418 try:
417 419 return fn(self, *args, **kwargs)
418 420 except KeyError:
419 421 pass
420 422 # retries exhausted
421 423 retrylog('retries exhausted in %s, raising KeyError\n' % funcname)
422 424 raise
423 425 return wrapped
General Comments 0
You need to be logged in to leave comments. Login now