##// END OF EJS Templates
py3: add b suffix to make sure file is opened in bytes mode...
Pulkit Goyal -
r40650:aa588bf4 default
parent child Browse files
Show More
@@ -1,539 +1,539
1 from __future__ import absolute_import
1 from __future__ import absolute_import
2
2
3 import collections
3 import collections
4 import errno
4 import errno
5 import hashlib
5 import hashlib
6 import mmap
6 import mmap
7 import os
7 import os
8 import struct
8 import struct
9 import time
9 import time
10
10
11 from mercurial.i18n import _
11 from mercurial.i18n import _
12 from mercurial import (
12 from mercurial import (
13 policy,
13 policy,
14 pycompat,
14 pycompat,
15 util,
15 util,
16 vfs as vfsmod,
16 vfs as vfsmod,
17 )
17 )
18 from . import shallowutil
18 from . import shallowutil
19
19
20 osutil = policy.importmod(r'osutil')
20 osutil = policy.importmod(r'osutil')
21
21
22 # The pack version supported by this implementation. This will need to be
22 # The pack version supported by this implementation. This will need to be
23 # rev'd whenever the byte format changes. Ex: changing the fanout prefix,
23 # rev'd whenever the byte format changes. Ex: changing the fanout prefix,
24 # changing any of the int sizes, changing the delta algorithm, etc.
24 # changing any of the int sizes, changing the delta algorithm, etc.
25 PACKVERSIONSIZE = 1
25 PACKVERSIONSIZE = 1
26 INDEXVERSIONSIZE = 2
26 INDEXVERSIONSIZE = 2
27
27
28 FANOUTSTART = INDEXVERSIONSIZE
28 FANOUTSTART = INDEXVERSIONSIZE
29
29
30 # Constant that indicates a fanout table entry hasn't been filled in. (This does
30 # Constant that indicates a fanout table entry hasn't been filled in. (This does
31 # not get serialized)
31 # not get serialized)
32 EMPTYFANOUT = -1
32 EMPTYFANOUT = -1
33
33
34 # The fanout prefix is the number of bytes that can be addressed by the fanout
34 # The fanout prefix is the number of bytes that can be addressed by the fanout
35 # table. Example: a fanout prefix of 1 means we use the first byte of a hash to
35 # table. Example: a fanout prefix of 1 means we use the first byte of a hash to
36 # look in the fanout table (which will be 2^8 entries long).
36 # look in the fanout table (which will be 2^8 entries long).
37 SMALLFANOUTPREFIX = 1
37 SMALLFANOUTPREFIX = 1
38 LARGEFANOUTPREFIX = 2
38 LARGEFANOUTPREFIX = 2
39
39
40 # The number of entries in the index at which point we switch to a large fanout.
40 # The number of entries in the index at which point we switch to a large fanout.
41 # It is chosen to balance the linear scan through a sparse fanout, with the
41 # It is chosen to balance the linear scan through a sparse fanout, with the
42 # size of the bisect in actual index.
42 # size of the bisect in actual index.
43 # 2^16 / 8 was chosen because it trades off (1 step fanout scan + 5 step
43 # 2^16 / 8 was chosen because it trades off (1 step fanout scan + 5 step
44 # bisect) with (8 step fanout scan + 1 step bisect)
44 # bisect) with (8 step fanout scan + 1 step bisect)
45 # 5 step bisect = log(2^16 / 8 / 255) # fanout
45 # 5 step bisect = log(2^16 / 8 / 255) # fanout
46 # 10 step fanout scan = 2^16 / (2^16 / 8) # fanout space divided by entries
46 # 10 step fanout scan = 2^16 / (2^16 / 8) # fanout space divided by entries
47 SMALLFANOUTCUTOFF = 2**16 / 8
47 SMALLFANOUTCUTOFF = 2**16 / 8
48
48
49 # The amount of time to wait between checking for new packs. This prevents an
49 # The amount of time to wait between checking for new packs. This prevents an
50 # exception when data is moved to a new pack after the process has already
50 # exception when data is moved to a new pack after the process has already
51 # loaded the pack list.
51 # loaded the pack list.
52 REFRESHRATE = 0.1
52 REFRESHRATE = 0.1
53
53
54 if pycompat.isposix:
54 if pycompat.isposix:
55 # With glibc 2.7+ the 'e' flag uses O_CLOEXEC when opening.
55 # With glibc 2.7+ the 'e' flag uses O_CLOEXEC when opening.
56 # The 'e' flag will be ignored on older versions of glibc.
56 # The 'e' flag will be ignored on older versions of glibc.
57 PACKOPENMODE = 'rbe'
57 PACKOPENMODE = 'rbe'
58 else:
58 else:
59 PACKOPENMODE = 'rb'
59 PACKOPENMODE = 'rb'
60
60
61 class _cachebackedpacks(object):
61 class _cachebackedpacks(object):
62 def __init__(self, packs, cachesize):
62 def __init__(self, packs, cachesize):
63 self._packs = set(packs)
63 self._packs = set(packs)
64 self._lrucache = util.lrucachedict(cachesize)
64 self._lrucache = util.lrucachedict(cachesize)
65 self._lastpack = None
65 self._lastpack = None
66
66
67 # Avoid cold start of the cache by populating the most recent packs
67 # Avoid cold start of the cache by populating the most recent packs
68 # in the cache.
68 # in the cache.
69 for i in reversed(range(min(cachesize, len(packs)))):
69 for i in reversed(range(min(cachesize, len(packs)))):
70 self._movetofront(packs[i])
70 self._movetofront(packs[i])
71
71
72 def _movetofront(self, pack):
72 def _movetofront(self, pack):
73 # This effectively makes pack the first entry in the cache.
73 # This effectively makes pack the first entry in the cache.
74 self._lrucache[pack] = True
74 self._lrucache[pack] = True
75
75
76 def _registerlastpackusage(self):
76 def _registerlastpackusage(self):
77 if self._lastpack is not None:
77 if self._lastpack is not None:
78 self._movetofront(self._lastpack)
78 self._movetofront(self._lastpack)
79 self._lastpack = None
79 self._lastpack = None
80
80
81 def add(self, pack):
81 def add(self, pack):
82 self._registerlastpackusage()
82 self._registerlastpackusage()
83
83
84 # This method will mostly be called when packs are not in cache.
84 # This method will mostly be called when packs are not in cache.
85 # Therefore, adding pack to the cache.
85 # Therefore, adding pack to the cache.
86 self._movetofront(pack)
86 self._movetofront(pack)
87 self._packs.add(pack)
87 self._packs.add(pack)
88
88
89 def __iter__(self):
89 def __iter__(self):
90 self._registerlastpackusage()
90 self._registerlastpackusage()
91
91
92 # Cache iteration is based on LRU.
92 # Cache iteration is based on LRU.
93 for pack in self._lrucache:
93 for pack in self._lrucache:
94 self._lastpack = pack
94 self._lastpack = pack
95 yield pack
95 yield pack
96
96
97 cachedpacks = set(pack for pack in self._lrucache)
97 cachedpacks = set(pack for pack in self._lrucache)
98 # Yield for paths not in the cache.
98 # Yield for paths not in the cache.
99 for pack in self._packs - cachedpacks:
99 for pack in self._packs - cachedpacks:
100 self._lastpack = pack
100 self._lastpack = pack
101 yield pack
101 yield pack
102
102
103 # Data not found in any pack.
103 # Data not found in any pack.
104 self._lastpack = None
104 self._lastpack = None
105
105
106 class basepackstore(object):
106 class basepackstore(object):
107 # Default cache size limit for the pack files.
107 # Default cache size limit for the pack files.
108 DEFAULTCACHESIZE = 100
108 DEFAULTCACHESIZE = 100
109
109
110 def __init__(self, ui, path):
110 def __init__(self, ui, path):
111 self.ui = ui
111 self.ui = ui
112 self.path = path
112 self.path = path
113
113
114 # lastrefesh is 0 so we'll immediately check for new packs on the first
114 # lastrefesh is 0 so we'll immediately check for new packs on the first
115 # failure.
115 # failure.
116 self.lastrefresh = 0
116 self.lastrefresh = 0
117
117
118 packs = []
118 packs = []
119 for filepath, __, __ in self._getavailablepackfilessorted():
119 for filepath, __, __ in self._getavailablepackfilessorted():
120 try:
120 try:
121 pack = self.getpack(filepath)
121 pack = self.getpack(filepath)
122 except Exception as ex:
122 except Exception as ex:
123 # An exception may be thrown if the pack file is corrupted
123 # An exception may be thrown if the pack file is corrupted
124 # somehow. Log a warning but keep going in this case, just
124 # somehow. Log a warning but keep going in this case, just
125 # skipping this pack file.
125 # skipping this pack file.
126 #
126 #
127 # If this is an ENOENT error then don't even bother logging.
127 # If this is an ENOENT error then don't even bother logging.
128 # Someone could have removed the file since we retrieved the
128 # Someone could have removed the file since we retrieved the
129 # list of paths.
129 # list of paths.
130 if getattr(ex, 'errno', None) != errno.ENOENT:
130 if getattr(ex, 'errno', None) != errno.ENOENT:
131 ui.warn(_('unable to load pack %s: %s\n') % (filepath, ex))
131 ui.warn(_('unable to load pack %s: %s\n') % (filepath, ex))
132 continue
132 continue
133 packs.append(pack)
133 packs.append(pack)
134
134
135 self.packs = _cachebackedpacks(packs, self.DEFAULTCACHESIZE)
135 self.packs = _cachebackedpacks(packs, self.DEFAULTCACHESIZE)
136
136
137 def _getavailablepackfiles(self):
137 def _getavailablepackfiles(self):
138 """For each pack file (a index/data file combo), yields:
138 """For each pack file (a index/data file combo), yields:
139 (full path without extension, mtime, size)
139 (full path without extension, mtime, size)
140
140
141 mtime will be the mtime of the index/data file (whichever is newer)
141 mtime will be the mtime of the index/data file (whichever is newer)
142 size is the combined size of index/data file
142 size is the combined size of index/data file
143 """
143 """
144 indexsuffixlen = len(self.INDEXSUFFIX)
144 indexsuffixlen = len(self.INDEXSUFFIX)
145 packsuffixlen = len(self.PACKSUFFIX)
145 packsuffixlen = len(self.PACKSUFFIX)
146
146
147 ids = set()
147 ids = set()
148 sizes = collections.defaultdict(lambda: 0)
148 sizes = collections.defaultdict(lambda: 0)
149 mtimes = collections.defaultdict(lambda: [])
149 mtimes = collections.defaultdict(lambda: [])
150 try:
150 try:
151 for filename, type, stat in osutil.listdir(self.path, stat=True):
151 for filename, type, stat in osutil.listdir(self.path, stat=True):
152 id = None
152 id = None
153 if filename[-indexsuffixlen:] == self.INDEXSUFFIX:
153 if filename[-indexsuffixlen:] == self.INDEXSUFFIX:
154 id = filename[:-indexsuffixlen]
154 id = filename[:-indexsuffixlen]
155 elif filename[-packsuffixlen:] == self.PACKSUFFIX:
155 elif filename[-packsuffixlen:] == self.PACKSUFFIX:
156 id = filename[:-packsuffixlen]
156 id = filename[:-packsuffixlen]
157
157
158 # Since we expect to have two files corresponding to each ID
158 # Since we expect to have two files corresponding to each ID
159 # (the index file and the pack file), we can yield once we see
159 # (the index file and the pack file), we can yield once we see
160 # it twice.
160 # it twice.
161 if id:
161 if id:
162 sizes[id] += stat.st_size # Sum both files' sizes together
162 sizes[id] += stat.st_size # Sum both files' sizes together
163 mtimes[id].append(stat.st_mtime)
163 mtimes[id].append(stat.st_mtime)
164 if id in ids:
164 if id in ids:
165 yield (os.path.join(self.path, id), max(mtimes[id]),
165 yield (os.path.join(self.path, id), max(mtimes[id]),
166 sizes[id])
166 sizes[id])
167 else:
167 else:
168 ids.add(id)
168 ids.add(id)
169 except OSError as ex:
169 except OSError as ex:
170 if ex.errno != errno.ENOENT:
170 if ex.errno != errno.ENOENT:
171 raise
171 raise
172
172
173 def _getavailablepackfilessorted(self):
173 def _getavailablepackfilessorted(self):
174 """Like `_getavailablepackfiles`, but also sorts the files by mtime,
174 """Like `_getavailablepackfiles`, but also sorts the files by mtime,
175 yielding newest files first.
175 yielding newest files first.
176
176
177 This is desirable, since it is more likely newer packfiles have more
177 This is desirable, since it is more likely newer packfiles have more
178 desirable data.
178 desirable data.
179 """
179 """
180 files = []
180 files = []
181 for path, mtime, size in self._getavailablepackfiles():
181 for path, mtime, size in self._getavailablepackfiles():
182 files.append((mtime, size, path))
182 files.append((mtime, size, path))
183 files = sorted(files, reverse=True)
183 files = sorted(files, reverse=True)
184 for mtime, size, path in files:
184 for mtime, size, path in files:
185 yield path, mtime, size
185 yield path, mtime, size
186
186
187 def gettotalsizeandcount(self):
187 def gettotalsizeandcount(self):
188 """Returns the total disk size (in bytes) of all the pack files in
188 """Returns the total disk size (in bytes) of all the pack files in
189 this store, and the count of pack files.
189 this store, and the count of pack files.
190
190
191 (This might be smaller than the total size of the ``self.path``
191 (This might be smaller than the total size of the ``self.path``
192 directory, since this only considers fuly-writen pack files, and not
192 directory, since this only considers fuly-writen pack files, and not
193 temporary files or other detritus on the directory.)
193 temporary files or other detritus on the directory.)
194 """
194 """
195 totalsize = 0
195 totalsize = 0
196 count = 0
196 count = 0
197 for __, __, size in self._getavailablepackfiles():
197 for __, __, size in self._getavailablepackfiles():
198 totalsize += size
198 totalsize += size
199 count += 1
199 count += 1
200 return totalsize, count
200 return totalsize, count
201
201
202 def getmetrics(self):
202 def getmetrics(self):
203 """Returns metrics on the state of this store."""
203 """Returns metrics on the state of this store."""
204 size, count = self.gettotalsizeandcount()
204 size, count = self.gettotalsizeandcount()
205 return {
205 return {
206 'numpacks': count,
206 'numpacks': count,
207 'totalpacksize': size,
207 'totalpacksize': size,
208 }
208 }
209
209
210 def getpack(self, path):
210 def getpack(self, path):
211 raise NotImplementedError()
211 raise NotImplementedError()
212
212
213 def getmissing(self, keys):
213 def getmissing(self, keys):
214 missing = keys
214 missing = keys
215 for pack in self.packs:
215 for pack in self.packs:
216 missing = pack.getmissing(missing)
216 missing = pack.getmissing(missing)
217
217
218 # Ensures better performance of the cache by keeping the most
218 # Ensures better performance of the cache by keeping the most
219 # recently accessed pack at the beginning in subsequent iterations.
219 # recently accessed pack at the beginning in subsequent iterations.
220 if not missing:
220 if not missing:
221 return missing
221 return missing
222
222
223 if missing:
223 if missing:
224 for pack in self.refresh():
224 for pack in self.refresh():
225 missing = pack.getmissing(missing)
225 missing = pack.getmissing(missing)
226
226
227 return missing
227 return missing
228
228
229 def markledger(self, ledger, options=None):
229 def markledger(self, ledger, options=None):
230 for pack in self.packs:
230 for pack in self.packs:
231 pack.markledger(ledger)
231 pack.markledger(ledger)
232
232
233 def markforrefresh(self):
233 def markforrefresh(self):
234 """Tells the store that there may be new pack files, so the next time it
234 """Tells the store that there may be new pack files, so the next time it
235 has a lookup miss it should check for new files."""
235 has a lookup miss it should check for new files."""
236 self.lastrefresh = 0
236 self.lastrefresh = 0
237
237
238 def refresh(self):
238 def refresh(self):
239 """Checks for any new packs on disk, adds them to the main pack list,
239 """Checks for any new packs on disk, adds them to the main pack list,
240 and returns a list of just the new packs."""
240 and returns a list of just the new packs."""
241 now = time.time()
241 now = time.time()
242
242
243 # If we experience a lot of misses (like in the case of getmissing() on
243 # If we experience a lot of misses (like in the case of getmissing() on
244 # new objects), let's only actually check disk for new stuff every once
244 # new objects), let's only actually check disk for new stuff every once
245 # in a while. Generally this code path should only ever matter when a
245 # in a while. Generally this code path should only ever matter when a
246 # repack is going on in the background, and that should be pretty rare
246 # repack is going on in the background, and that should be pretty rare
247 # to have that happen twice in quick succession.
247 # to have that happen twice in quick succession.
248 newpacks = []
248 newpacks = []
249 if now > self.lastrefresh + REFRESHRATE:
249 if now > self.lastrefresh + REFRESHRATE:
250 self.lastrefresh = now
250 self.lastrefresh = now
251 previous = set(p.path for p in self.packs)
251 previous = set(p.path for p in self.packs)
252 for filepath, __, __ in self._getavailablepackfilessorted():
252 for filepath, __, __ in self._getavailablepackfilessorted():
253 if filepath not in previous:
253 if filepath not in previous:
254 newpack = self.getpack(filepath)
254 newpack = self.getpack(filepath)
255 newpacks.append(newpack)
255 newpacks.append(newpack)
256 self.packs.add(newpack)
256 self.packs.add(newpack)
257
257
258 return newpacks
258 return newpacks
259
259
260 class versionmixin(object):
260 class versionmixin(object):
261 # Mix-in for classes with multiple supported versions
261 # Mix-in for classes with multiple supported versions
262 VERSION = None
262 VERSION = None
263 SUPPORTED_VERSIONS = [2]
263 SUPPORTED_VERSIONS = [2]
264
264
265 def _checkversion(self, version):
265 def _checkversion(self, version):
266 if version in self.SUPPORTED_VERSIONS:
266 if version in self.SUPPORTED_VERSIONS:
267 if self.VERSION is None:
267 if self.VERSION is None:
268 # only affect this instance
268 # only affect this instance
269 self.VERSION = version
269 self.VERSION = version
270 elif self.VERSION != version:
270 elif self.VERSION != version:
271 raise RuntimeError('inconsistent version: %s' % version)
271 raise RuntimeError('inconsistent version: %s' % version)
272 else:
272 else:
273 raise RuntimeError('unsupported version: %s' % version)
273 raise RuntimeError('unsupported version: %s' % version)
274
274
275 class basepack(versionmixin):
275 class basepack(versionmixin):
276 # The maximum amount we should read via mmap before remmaping so the old
276 # The maximum amount we should read via mmap before remmaping so the old
277 # pages can be released (100MB)
277 # pages can be released (100MB)
278 MAXPAGEDIN = 100 * 1024**2
278 MAXPAGEDIN = 100 * 1024**2
279
279
280 SUPPORTED_VERSIONS = [2]
280 SUPPORTED_VERSIONS = [2]
281
281
282 def __init__(self, path):
282 def __init__(self, path):
283 self.path = path
283 self.path = path
284 self.packpath = path + self.PACKSUFFIX
284 self.packpath = path + self.PACKSUFFIX
285 self.indexpath = path + self.INDEXSUFFIX
285 self.indexpath = path + self.INDEXSUFFIX
286
286
287 self.indexsize = os.stat(self.indexpath).st_size
287 self.indexsize = os.stat(self.indexpath).st_size
288 self.datasize = os.stat(self.packpath).st_size
288 self.datasize = os.stat(self.packpath).st_size
289
289
290 self._index = None
290 self._index = None
291 self._data = None
291 self._data = None
292 self.freememory() # initialize the mmap
292 self.freememory() # initialize the mmap
293
293
294 version = struct.unpack('!B', self._data[:PACKVERSIONSIZE])[0]
294 version = struct.unpack('!B', self._data[:PACKVERSIONSIZE])[0]
295 self._checkversion(version)
295 self._checkversion(version)
296
296
297 version, config = struct.unpack('!BB', self._index[:INDEXVERSIONSIZE])
297 version, config = struct.unpack('!BB', self._index[:INDEXVERSIONSIZE])
298 self._checkversion(version)
298 self._checkversion(version)
299
299
300 if 0b10000000 & config:
300 if 0b10000000 & config:
301 self.params = indexparams(LARGEFANOUTPREFIX, version)
301 self.params = indexparams(LARGEFANOUTPREFIX, version)
302 else:
302 else:
303 self.params = indexparams(SMALLFANOUTPREFIX, version)
303 self.params = indexparams(SMALLFANOUTPREFIX, version)
304
304
305 @util.propertycache
305 @util.propertycache
306 def _fanouttable(self):
306 def _fanouttable(self):
307 params = self.params
307 params = self.params
308 rawfanout = self._index[FANOUTSTART:FANOUTSTART + params.fanoutsize]
308 rawfanout = self._index[FANOUTSTART:FANOUTSTART + params.fanoutsize]
309 fanouttable = []
309 fanouttable = []
310 for i in pycompat.xrange(0, params.fanoutcount):
310 for i in pycompat.xrange(0, params.fanoutcount):
311 loc = i * 4
311 loc = i * 4
312 fanoutentry = struct.unpack('!I', rawfanout[loc:loc + 4])[0]
312 fanoutentry = struct.unpack('!I', rawfanout[loc:loc + 4])[0]
313 fanouttable.append(fanoutentry)
313 fanouttable.append(fanoutentry)
314 return fanouttable
314 return fanouttable
315
315
316 @util.propertycache
316 @util.propertycache
317 def _indexend(self):
317 def _indexend(self):
318 nodecount = struct.unpack_from('!Q', self._index,
318 nodecount = struct.unpack_from('!Q', self._index,
319 self.params.indexstart - 8)[0]
319 self.params.indexstart - 8)[0]
320 return self.params.indexstart + nodecount * self.INDEXENTRYLENGTH
320 return self.params.indexstart + nodecount * self.INDEXENTRYLENGTH
321
321
322 def freememory(self):
322 def freememory(self):
323 """Unmap and remap the memory to free it up after known expensive
323 """Unmap and remap the memory to free it up after known expensive
324 operations. Return True if self._data and self._index were reloaded.
324 operations. Return True if self._data and self._index were reloaded.
325 """
325 """
326 if self._index:
326 if self._index:
327 if self._pagedin < self.MAXPAGEDIN:
327 if self._pagedin < self.MAXPAGEDIN:
328 return False
328 return False
329
329
330 self._index.close()
330 self._index.close()
331 self._data.close()
331 self._data.close()
332
332
333 # TODO: use an opener/vfs to access these paths
333 # TODO: use an opener/vfs to access these paths
334 with open(self.indexpath, PACKOPENMODE) as indexfp:
334 with open(self.indexpath, PACKOPENMODE) as indexfp:
335 # memory-map the file, size 0 means whole file
335 # memory-map the file, size 0 means whole file
336 self._index = mmap.mmap(indexfp.fileno(), 0,
336 self._index = mmap.mmap(indexfp.fileno(), 0,
337 access=mmap.ACCESS_READ)
337 access=mmap.ACCESS_READ)
338 with open(self.packpath, PACKOPENMODE) as datafp:
338 with open(self.packpath, PACKOPENMODE) as datafp:
339 self._data = mmap.mmap(datafp.fileno(), 0, access=mmap.ACCESS_READ)
339 self._data = mmap.mmap(datafp.fileno(), 0, access=mmap.ACCESS_READ)
340
340
341 self._pagedin = 0
341 self._pagedin = 0
342 return True
342 return True
343
343
344 def getmissing(self, keys):
344 def getmissing(self, keys):
345 raise NotImplementedError()
345 raise NotImplementedError()
346
346
347 def markledger(self, ledger, options=None):
347 def markledger(self, ledger, options=None):
348 raise NotImplementedError()
348 raise NotImplementedError()
349
349
350 def cleanup(self, ledger):
350 def cleanup(self, ledger):
351 raise NotImplementedError()
351 raise NotImplementedError()
352
352
353 def __iter__(self):
353 def __iter__(self):
354 raise NotImplementedError()
354 raise NotImplementedError()
355
355
356 def iterentries(self):
356 def iterentries(self):
357 raise NotImplementedError()
357 raise NotImplementedError()
358
358
359 class mutablebasepack(versionmixin):
359 class mutablebasepack(versionmixin):
360
360
361 def __init__(self, ui, packdir, version=2):
361 def __init__(self, ui, packdir, version=2):
362 self._checkversion(version)
362 self._checkversion(version)
363 # TODO(augie): make this configurable
363 # TODO(augie): make this configurable
364 self._compressor = 'GZ'
364 self._compressor = 'GZ'
365 opener = vfsmod.vfs(packdir)
365 opener = vfsmod.vfs(packdir)
366 opener.createmode = 0o444
366 opener.createmode = 0o444
367 self.opener = opener
367 self.opener = opener
368
368
369 self.entries = {}
369 self.entries = {}
370
370
371 shallowutil.mkstickygroupdir(ui, packdir)
371 shallowutil.mkstickygroupdir(ui, packdir)
372 self.packfp, self.packpath = opener.mkstemp(
372 self.packfp, self.packpath = opener.mkstemp(
373 suffix=self.PACKSUFFIX + '-tmp')
373 suffix=self.PACKSUFFIX + '-tmp')
374 self.idxfp, self.idxpath = opener.mkstemp(
374 self.idxfp, self.idxpath = opener.mkstemp(
375 suffix=self.INDEXSUFFIX + '-tmp')
375 suffix=self.INDEXSUFFIX + '-tmp')
376 self.packfp = os.fdopen(self.packfp, r'w+')
376 self.packfp = os.fdopen(self.packfp, r'wb+')
377 self.idxfp = os.fdopen(self.idxfp, r'w+')
377 self.idxfp = os.fdopen(self.idxfp, r'wb+')
378 self.sha = hashlib.sha1()
378 self.sha = hashlib.sha1()
379 self._closed = False
379 self._closed = False
380
380
381 # The opener provides no way of doing permission fixup on files created
381 # The opener provides no way of doing permission fixup on files created
382 # via mkstemp, so we must fix it ourselves. We can probably fix this
382 # via mkstemp, so we must fix it ourselves. We can probably fix this
383 # upstream in vfs.mkstemp so we don't need to use the private method.
383 # upstream in vfs.mkstemp so we don't need to use the private method.
384 opener._fixfilemode(opener.join(self.packpath))
384 opener._fixfilemode(opener.join(self.packpath))
385 opener._fixfilemode(opener.join(self.idxpath))
385 opener._fixfilemode(opener.join(self.idxpath))
386
386
387 # Write header
387 # Write header
388 # TODO: make it extensible (ex: allow specifying compression algorithm,
388 # TODO: make it extensible (ex: allow specifying compression algorithm,
389 # a flexible key/value header, delta algorithm, fanout size, etc)
389 # a flexible key/value header, delta algorithm, fanout size, etc)
390 versionbuf = struct.pack('!B', self.VERSION) # unsigned 1 byte int
390 versionbuf = struct.pack('!B', self.VERSION) # unsigned 1 byte int
391 self.writeraw(versionbuf)
391 self.writeraw(versionbuf)
392
392
393 def __enter__(self):
393 def __enter__(self):
394 return self
394 return self
395
395
396 def __exit__(self, exc_type, exc_value, traceback):
396 def __exit__(self, exc_type, exc_value, traceback):
397 if exc_type is None:
397 if exc_type is None:
398 self.close()
398 self.close()
399 else:
399 else:
400 self.abort()
400 self.abort()
401
401
402 def abort(self):
402 def abort(self):
403 # Unclean exit
403 # Unclean exit
404 self._cleantemppacks()
404 self._cleantemppacks()
405
405
406 def writeraw(self, data):
406 def writeraw(self, data):
407 self.packfp.write(data)
407 self.packfp.write(data)
408 self.sha.update(data)
408 self.sha.update(data)
409
409
410 def close(self, ledger=None):
410 def close(self, ledger=None):
411 if self._closed:
411 if self._closed:
412 return
412 return
413
413
414 try:
414 try:
415 sha = self.sha.hexdigest()
415 sha = self.sha.hexdigest()
416 self.packfp.close()
416 self.packfp.close()
417 self.writeindex()
417 self.writeindex()
418
418
419 if len(self.entries) == 0:
419 if len(self.entries) == 0:
420 # Empty pack
420 # Empty pack
421 self._cleantemppacks()
421 self._cleantemppacks()
422 self._closed = True
422 self._closed = True
423 return None
423 return None
424
424
425 self.opener.rename(self.packpath, sha + self.PACKSUFFIX)
425 self.opener.rename(self.packpath, sha + self.PACKSUFFIX)
426 try:
426 try:
427 self.opener.rename(self.idxpath, sha + self.INDEXSUFFIX)
427 self.opener.rename(self.idxpath, sha + self.INDEXSUFFIX)
428 except Exception as ex:
428 except Exception as ex:
429 try:
429 try:
430 self.opener.unlink(sha + self.PACKSUFFIX)
430 self.opener.unlink(sha + self.PACKSUFFIX)
431 except Exception:
431 except Exception:
432 pass
432 pass
433 # Throw exception 'ex' explicitly since a normal 'raise' would
433 # Throw exception 'ex' explicitly since a normal 'raise' would
434 # potentially throw an exception from the unlink cleanup.
434 # potentially throw an exception from the unlink cleanup.
435 raise ex
435 raise ex
436 except Exception:
436 except Exception:
437 # Clean up temp packs in all exception cases
437 # Clean up temp packs in all exception cases
438 self._cleantemppacks()
438 self._cleantemppacks()
439 raise
439 raise
440
440
441 self._closed = True
441 self._closed = True
442 result = self.opener.join(sha)
442 result = self.opener.join(sha)
443 if ledger:
443 if ledger:
444 ledger.addcreated(result)
444 ledger.addcreated(result)
445 return result
445 return result
446
446
447 def _cleantemppacks(self):
447 def _cleantemppacks(self):
448 try:
448 try:
449 self.opener.unlink(self.packpath)
449 self.opener.unlink(self.packpath)
450 except Exception:
450 except Exception:
451 pass
451 pass
452 try:
452 try:
453 self.opener.unlink(self.idxpath)
453 self.opener.unlink(self.idxpath)
454 except Exception:
454 except Exception:
455 pass
455 pass
456
456
457 def writeindex(self):
457 def writeindex(self):
458 rawindex = ''
458 rawindex = ''
459
459
460 largefanout = len(self.entries) > SMALLFANOUTCUTOFF
460 largefanout = len(self.entries) > SMALLFANOUTCUTOFF
461 if largefanout:
461 if largefanout:
462 params = indexparams(LARGEFANOUTPREFIX, self.VERSION)
462 params = indexparams(LARGEFANOUTPREFIX, self.VERSION)
463 else:
463 else:
464 params = indexparams(SMALLFANOUTPREFIX, self.VERSION)
464 params = indexparams(SMALLFANOUTPREFIX, self.VERSION)
465
465
466 fanouttable = [EMPTYFANOUT] * params.fanoutcount
466 fanouttable = [EMPTYFANOUT] * params.fanoutcount
467
467
468 # Precompute the location of each entry
468 # Precompute the location of each entry
469 locations = {}
469 locations = {}
470 count = 0
470 count = 0
471 for node in sorted(self.entries):
471 for node in sorted(self.entries):
472 location = count * self.INDEXENTRYLENGTH
472 location = count * self.INDEXENTRYLENGTH
473 locations[node] = location
473 locations[node] = location
474 count += 1
474 count += 1
475
475
476 # Must use [0] on the unpack result since it's always a tuple.
476 # Must use [0] on the unpack result since it's always a tuple.
477 fanoutkey = struct.unpack(params.fanoutstruct,
477 fanoutkey = struct.unpack(params.fanoutstruct,
478 node[:params.fanoutprefix])[0]
478 node[:params.fanoutprefix])[0]
479 if fanouttable[fanoutkey] == EMPTYFANOUT:
479 if fanouttable[fanoutkey] == EMPTYFANOUT:
480 fanouttable[fanoutkey] = location
480 fanouttable[fanoutkey] = location
481
481
482 rawfanouttable = ''
482 rawfanouttable = ''
483 last = 0
483 last = 0
484 for offset in fanouttable:
484 for offset in fanouttable:
485 offset = offset if offset != EMPTYFANOUT else last
485 offset = offset if offset != EMPTYFANOUT else last
486 last = offset
486 last = offset
487 rawfanouttable += struct.pack('!I', offset)
487 rawfanouttable += struct.pack('!I', offset)
488
488
489 rawentrieslength = struct.pack('!Q', len(self.entries))
489 rawentrieslength = struct.pack('!Q', len(self.entries))
490
490
491 # The index offset is the it's location in the file. So after the 2 byte
491 # The index offset is the it's location in the file. So after the 2 byte
492 # header and the fanouttable.
492 # header and the fanouttable.
493 rawindex = self.createindex(locations, 2 + len(rawfanouttable))
493 rawindex = self.createindex(locations, 2 + len(rawfanouttable))
494
494
495 self._writeheader(params)
495 self._writeheader(params)
496 self.idxfp.write(rawfanouttable)
496 self.idxfp.write(rawfanouttable)
497 self.idxfp.write(rawentrieslength)
497 self.idxfp.write(rawentrieslength)
498 self.idxfp.write(rawindex)
498 self.idxfp.write(rawindex)
499 self.idxfp.close()
499 self.idxfp.close()
500
500
501 def createindex(self, nodelocations):
501 def createindex(self, nodelocations):
502 raise NotImplementedError()
502 raise NotImplementedError()
503
503
504 def _writeheader(self, indexparams):
504 def _writeheader(self, indexparams):
505 # Index header
505 # Index header
506 # <version: 1 byte>
506 # <version: 1 byte>
507 # <large fanout: 1 bit> # 1 means 2^16, 0 means 2^8
507 # <large fanout: 1 bit> # 1 means 2^16, 0 means 2^8
508 # <unused: 7 bit> # future use (compression, delta format, etc)
508 # <unused: 7 bit> # future use (compression, delta format, etc)
509 config = 0
509 config = 0
510 if indexparams.fanoutprefix == LARGEFANOUTPREFIX:
510 if indexparams.fanoutprefix == LARGEFANOUTPREFIX:
511 config = 0b10000000
511 config = 0b10000000
512 self.idxfp.write(struct.pack('!BB', self.VERSION, config))
512 self.idxfp.write(struct.pack('!BB', self.VERSION, config))
513
513
514 class indexparams(object):
514 class indexparams(object):
515 __slots__ = (r'fanoutprefix', r'fanoutstruct', r'fanoutcount',
515 __slots__ = (r'fanoutprefix', r'fanoutstruct', r'fanoutcount',
516 r'fanoutsize', r'indexstart')
516 r'fanoutsize', r'indexstart')
517
517
518 def __init__(self, prefixsize, version):
518 def __init__(self, prefixsize, version):
519 self.fanoutprefix = prefixsize
519 self.fanoutprefix = prefixsize
520
520
521 # The struct pack format for fanout table location (i.e. the format that
521 # The struct pack format for fanout table location (i.e. the format that
522 # converts the node prefix into an integer location in the fanout
522 # converts the node prefix into an integer location in the fanout
523 # table).
523 # table).
524 if prefixsize == SMALLFANOUTPREFIX:
524 if prefixsize == SMALLFANOUTPREFIX:
525 self.fanoutstruct = '!B'
525 self.fanoutstruct = '!B'
526 elif prefixsize == LARGEFANOUTPREFIX:
526 elif prefixsize == LARGEFANOUTPREFIX:
527 self.fanoutstruct = '!H'
527 self.fanoutstruct = '!H'
528 else:
528 else:
529 raise ValueError("invalid fanout prefix size: %s" % prefixsize)
529 raise ValueError("invalid fanout prefix size: %s" % prefixsize)
530
530
531 # The number of fanout table entries
531 # The number of fanout table entries
532 self.fanoutcount = 2**(prefixsize * 8)
532 self.fanoutcount = 2**(prefixsize * 8)
533
533
534 # The total bytes used by the fanout table
534 # The total bytes used by the fanout table
535 self.fanoutsize = self.fanoutcount * 4
535 self.fanoutsize = self.fanoutcount * 4
536
536
537 self.indexstart = FANOUTSTART + self.fanoutsize
537 self.indexstart = FANOUTSTART + self.fanoutsize
538 # Skip the index length
538 # Skip the index length
539 self.indexstart += 8
539 self.indexstart += 8
@@ -1,423 +1,423
1 from __future__ import absolute_import
1 from __future__ import absolute_import
2
2
3 import errno
3 import errno
4 import hashlib
4 import hashlib
5 import os
5 import os
6 import shutil
6 import shutil
7 import stat
7 import stat
8 import time
8 import time
9
9
10 from mercurial.i18n import _
10 from mercurial.i18n import _
11 from mercurial.node import bin, hex
11 from mercurial.node import bin, hex
12 from mercurial import (
12 from mercurial import (
13 error,
13 error,
14 pycompat,
14 pycompat,
15 util,
15 util,
16 )
16 )
17 from . import (
17 from . import (
18 constants,
18 constants,
19 shallowutil,
19 shallowutil,
20 )
20 )
21
21
22 class basestore(object):
22 class basestore(object):
23 def __init__(self, repo, path, reponame, shared=False):
23 def __init__(self, repo, path, reponame, shared=False):
24 """Creates a remotefilelog store object for the given repo name.
24 """Creates a remotefilelog store object for the given repo name.
25
25
26 `path` - The file path where this store keeps its data
26 `path` - The file path where this store keeps its data
27 `reponame` - The name of the repo. This is used to partition data from
27 `reponame` - The name of the repo. This is used to partition data from
28 many repos.
28 many repos.
29 `shared` - True if this store is a shared cache of data from the central
29 `shared` - True if this store is a shared cache of data from the central
30 server, for many repos on this machine. False means this store is for
30 server, for many repos on this machine. False means this store is for
31 the local data for one repo.
31 the local data for one repo.
32 """
32 """
33 self.repo = repo
33 self.repo = repo
34 self.ui = repo.ui
34 self.ui = repo.ui
35 self._path = path
35 self._path = path
36 self._reponame = reponame
36 self._reponame = reponame
37 self._shared = shared
37 self._shared = shared
38 self._uid = os.getuid() if not pycompat.iswindows else None
38 self._uid = os.getuid() if not pycompat.iswindows else None
39
39
40 self._validatecachelog = self.ui.config("remotefilelog",
40 self._validatecachelog = self.ui.config("remotefilelog",
41 "validatecachelog")
41 "validatecachelog")
42 self._validatecache = self.ui.config("remotefilelog", "validatecache",
42 self._validatecache = self.ui.config("remotefilelog", "validatecache",
43 'on')
43 'on')
44 if self._validatecache not in ('on', 'strict', 'off'):
44 if self._validatecache not in ('on', 'strict', 'off'):
45 self._validatecache = 'on'
45 self._validatecache = 'on'
46 if self._validatecache == 'off':
46 if self._validatecache == 'off':
47 self._validatecache = False
47 self._validatecache = False
48
48
49 if shared:
49 if shared:
50 shallowutil.mkstickygroupdir(self.ui, path)
50 shallowutil.mkstickygroupdir(self.ui, path)
51
51
52 def getmissing(self, keys):
52 def getmissing(self, keys):
53 missing = []
53 missing = []
54 for name, node in keys:
54 for name, node in keys:
55 filepath = self._getfilepath(name, node)
55 filepath = self._getfilepath(name, node)
56 exists = os.path.exists(filepath)
56 exists = os.path.exists(filepath)
57 if (exists and self._validatecache == 'strict' and
57 if (exists and self._validatecache == 'strict' and
58 not self._validatekey(filepath, 'contains')):
58 not self._validatekey(filepath, 'contains')):
59 exists = False
59 exists = False
60 if not exists:
60 if not exists:
61 missing.append((name, node))
61 missing.append((name, node))
62
62
63 return missing
63 return missing
64
64
65 # BELOW THIS ARE IMPLEMENTATIONS OF REPACK SOURCE
65 # BELOW THIS ARE IMPLEMENTATIONS OF REPACK SOURCE
66
66
67 def markledger(self, ledger, options=None):
67 def markledger(self, ledger, options=None):
68 if options and options.get(constants.OPTION_PACKSONLY):
68 if options and options.get(constants.OPTION_PACKSONLY):
69 return
69 return
70 if self._shared:
70 if self._shared:
71 for filename, nodes in self._getfiles():
71 for filename, nodes in self._getfiles():
72 for node in nodes:
72 for node in nodes:
73 ledger.markdataentry(self, filename, node)
73 ledger.markdataentry(self, filename, node)
74 ledger.markhistoryentry(self, filename, node)
74 ledger.markhistoryentry(self, filename, node)
75
75
76 def cleanup(self, ledger):
76 def cleanup(self, ledger):
77 ui = self.ui
77 ui = self.ui
78 entries = ledger.sources.get(self, [])
78 entries = ledger.sources.get(self, [])
79 count = 0
79 count = 0
80 for entry in entries:
80 for entry in entries:
81 if entry.gced or (entry.datarepacked and entry.historyrepacked):
81 if entry.gced or (entry.datarepacked and entry.historyrepacked):
82 ui.progress(_("cleaning up"), count, unit="files",
82 ui.progress(_("cleaning up"), count, unit="files",
83 total=len(entries))
83 total=len(entries))
84 path = self._getfilepath(entry.filename, entry.node)
84 path = self._getfilepath(entry.filename, entry.node)
85 util.tryunlink(path)
85 util.tryunlink(path)
86 count += 1
86 count += 1
87 ui.progress(_("cleaning up"), None)
87 ui.progress(_("cleaning up"), None)
88
88
89 # Clean up the repo cache directory.
89 # Clean up the repo cache directory.
90 self._cleanupdirectory(self._getrepocachepath())
90 self._cleanupdirectory(self._getrepocachepath())
91
91
92 # BELOW THIS ARE NON-STANDARD APIS
92 # BELOW THIS ARE NON-STANDARD APIS
93
93
94 def _cleanupdirectory(self, rootdir):
94 def _cleanupdirectory(self, rootdir):
95 """Removes the empty directories and unnecessary files within the root
95 """Removes the empty directories and unnecessary files within the root
96 directory recursively. Note that this method does not remove the root
96 directory recursively. Note that this method does not remove the root
97 directory itself. """
97 directory itself. """
98
98
99 oldfiles = set()
99 oldfiles = set()
100 otherfiles = set()
100 otherfiles = set()
101 # osutil.listdir returns stat information which saves some rmdir/listdir
101 # osutil.listdir returns stat information which saves some rmdir/listdir
102 # syscalls.
102 # syscalls.
103 for name, mode in util.osutil.listdir(rootdir):
103 for name, mode in util.osutil.listdir(rootdir):
104 if stat.S_ISDIR(mode):
104 if stat.S_ISDIR(mode):
105 dirpath = os.path.join(rootdir, name)
105 dirpath = os.path.join(rootdir, name)
106 self._cleanupdirectory(dirpath)
106 self._cleanupdirectory(dirpath)
107
107
108 # Now that the directory specified by dirpath is potentially
108 # Now that the directory specified by dirpath is potentially
109 # empty, try and remove it.
109 # empty, try and remove it.
110 try:
110 try:
111 os.rmdir(dirpath)
111 os.rmdir(dirpath)
112 except OSError:
112 except OSError:
113 pass
113 pass
114
114
115 elif stat.S_ISREG(mode):
115 elif stat.S_ISREG(mode):
116 if name.endswith('_old'):
116 if name.endswith('_old'):
117 oldfiles.add(name[:-4])
117 oldfiles.add(name[:-4])
118 else:
118 else:
119 otherfiles.add(name)
119 otherfiles.add(name)
120
120
121 # Remove the files which end with suffix '_old' and have no
121 # Remove the files which end with suffix '_old' and have no
122 # corresponding file without the suffix '_old'. See addremotefilelognode
122 # corresponding file without the suffix '_old'. See addremotefilelognode
123 # method for the generation/purpose of files with '_old' suffix.
123 # method for the generation/purpose of files with '_old' suffix.
124 for filename in oldfiles - otherfiles:
124 for filename in oldfiles - otherfiles:
125 filepath = os.path.join(rootdir, filename + '_old')
125 filepath = os.path.join(rootdir, filename + '_old')
126 util.tryunlink(filepath)
126 util.tryunlink(filepath)
127
127
128 def _getfiles(self):
128 def _getfiles(self):
129 """Return a list of (filename, [node,...]) for all the revisions that
129 """Return a list of (filename, [node,...]) for all the revisions that
130 exist in the store.
130 exist in the store.
131
131
132 This is useful for obtaining a list of all the contents of the store
132 This is useful for obtaining a list of all the contents of the store
133 when performing a repack to another store, since the store API requires
133 when performing a repack to another store, since the store API requires
134 name+node keys and not namehash+node keys.
134 name+node keys and not namehash+node keys.
135 """
135 """
136 existing = {}
136 existing = {}
137 for filenamehash, node in self._listkeys():
137 for filenamehash, node in self._listkeys():
138 existing.setdefault(filenamehash, []).append(node)
138 existing.setdefault(filenamehash, []).append(node)
139
139
140 filenamemap = self._resolvefilenames(existing.keys())
140 filenamemap = self._resolvefilenames(existing.keys())
141
141
142 for filename, sha in filenamemap.iteritems():
142 for filename, sha in filenamemap.iteritems():
143 yield (filename, existing[sha])
143 yield (filename, existing[sha])
144
144
145 def _resolvefilenames(self, hashes):
145 def _resolvefilenames(self, hashes):
146 """Given a list of filename hashes that are present in the
146 """Given a list of filename hashes that are present in the
147 remotefilelog store, return a mapping from filename->hash.
147 remotefilelog store, return a mapping from filename->hash.
148
148
149 This is useful when converting remotefilelog blobs into other storage
149 This is useful when converting remotefilelog blobs into other storage
150 formats.
150 formats.
151 """
151 """
152 if not hashes:
152 if not hashes:
153 return {}
153 return {}
154
154
155 filenames = {}
155 filenames = {}
156 missingfilename = set(hashes)
156 missingfilename = set(hashes)
157
157
158 # Start with a full manifest, since it'll cover the majority of files
158 # Start with a full manifest, since it'll cover the majority of files
159 for filename in self.repo['tip'].manifest():
159 for filename in self.repo['tip'].manifest():
160 sha = hashlib.sha1(filename).digest()
160 sha = hashlib.sha1(filename).digest()
161 if sha in missingfilename:
161 if sha in missingfilename:
162 filenames[filename] = sha
162 filenames[filename] = sha
163 missingfilename.discard(sha)
163 missingfilename.discard(sha)
164
164
165 # Scan the changelog until we've found every file name
165 # Scan the changelog until we've found every file name
166 cl = self.repo.unfiltered().changelog
166 cl = self.repo.unfiltered().changelog
167 for rev in pycompat.xrange(len(cl) - 1, -1, -1):
167 for rev in pycompat.xrange(len(cl) - 1, -1, -1):
168 if not missingfilename:
168 if not missingfilename:
169 break
169 break
170 files = cl.readfiles(cl.node(rev))
170 files = cl.readfiles(cl.node(rev))
171 for filename in files:
171 for filename in files:
172 sha = hashlib.sha1(filename).digest()
172 sha = hashlib.sha1(filename).digest()
173 if sha in missingfilename:
173 if sha in missingfilename:
174 filenames[filename] = sha
174 filenames[filename] = sha
175 missingfilename.discard(sha)
175 missingfilename.discard(sha)
176
176
177 return filenames
177 return filenames
178
178
179 def _getrepocachepath(self):
179 def _getrepocachepath(self):
180 return os.path.join(
180 return os.path.join(
181 self._path, self._reponame) if self._shared else self._path
181 self._path, self._reponame) if self._shared else self._path
182
182
183 def _listkeys(self):
183 def _listkeys(self):
184 """List all the remotefilelog keys that exist in the store.
184 """List all the remotefilelog keys that exist in the store.
185
185
186 Returns a iterator of (filename hash, filecontent hash) tuples.
186 Returns a iterator of (filename hash, filecontent hash) tuples.
187 """
187 """
188
188
189 for root, dirs, files in os.walk(self._getrepocachepath()):
189 for root, dirs, files in os.walk(self._getrepocachepath()):
190 for filename in files:
190 for filename in files:
191 if len(filename) != 40:
191 if len(filename) != 40:
192 continue
192 continue
193 node = filename
193 node = filename
194 if self._shared:
194 if self._shared:
195 # .../1a/85ffda..be21
195 # .../1a/85ffda..be21
196 filenamehash = root[-41:-39] + root[-38:]
196 filenamehash = root[-41:-39] + root[-38:]
197 else:
197 else:
198 filenamehash = root[-40:]
198 filenamehash = root[-40:]
199 yield (bin(filenamehash), bin(node))
199 yield (bin(filenamehash), bin(node))
200
200
201 def _getfilepath(self, name, node):
201 def _getfilepath(self, name, node):
202 node = hex(node)
202 node = hex(node)
203 if self._shared:
203 if self._shared:
204 key = shallowutil.getcachekey(self._reponame, name, node)
204 key = shallowutil.getcachekey(self._reponame, name, node)
205 else:
205 else:
206 key = shallowutil.getlocalkey(name, node)
206 key = shallowutil.getlocalkey(name, node)
207
207
208 return os.path.join(self._path, key)
208 return os.path.join(self._path, key)
209
209
210 def _getdata(self, name, node):
210 def _getdata(self, name, node):
211 filepath = self._getfilepath(name, node)
211 filepath = self._getfilepath(name, node)
212 try:
212 try:
213 data = shallowutil.readfile(filepath)
213 data = shallowutil.readfile(filepath)
214 if self._validatecache and not self._validatedata(data, filepath):
214 if self._validatecache and not self._validatedata(data, filepath):
215 if self._validatecachelog:
215 if self._validatecachelog:
216 with open(self._validatecachelog, 'a+') as f:
216 with open(self._validatecachelog, 'a+') as f:
217 f.write("corrupt %s during read\n" % filepath)
217 f.write("corrupt %s during read\n" % filepath)
218 os.rename(filepath, filepath + ".corrupt")
218 os.rename(filepath, filepath + ".corrupt")
219 raise KeyError("corrupt local cache file %s" % filepath)
219 raise KeyError("corrupt local cache file %s" % filepath)
220 except IOError:
220 except IOError:
221 raise KeyError("no file found at %s for %s:%s" % (filepath, name,
221 raise KeyError("no file found at %s for %s:%s" % (filepath, name,
222 hex(node)))
222 hex(node)))
223
223
224 return data
224 return data
225
225
226 def addremotefilelognode(self, name, node, data):
226 def addremotefilelognode(self, name, node, data):
227 filepath = self._getfilepath(name, node)
227 filepath = self._getfilepath(name, node)
228
228
229 oldumask = os.umask(0o002)
229 oldumask = os.umask(0o002)
230 try:
230 try:
231 # if this node already exists, save the old version for
231 # if this node already exists, save the old version for
232 # recovery/debugging purposes.
232 # recovery/debugging purposes.
233 if os.path.exists(filepath):
233 if os.path.exists(filepath):
234 newfilename = filepath + '_old'
234 newfilename = filepath + '_old'
235 # newfilename can be read-only and shutil.copy will fail.
235 # newfilename can be read-only and shutil.copy will fail.
236 # Delete newfilename to avoid it
236 # Delete newfilename to avoid it
237 if os.path.exists(newfilename):
237 if os.path.exists(newfilename):
238 shallowutil.unlinkfile(newfilename)
238 shallowutil.unlinkfile(newfilename)
239 shutil.copy(filepath, newfilename)
239 shutil.copy(filepath, newfilename)
240
240
241 shallowutil.mkstickygroupdir(self.ui, os.path.dirname(filepath))
241 shallowutil.mkstickygroupdir(self.ui, os.path.dirname(filepath))
242 shallowutil.writefile(filepath, data, readonly=True)
242 shallowutil.writefile(filepath, data, readonly=True)
243
243
244 if self._validatecache:
244 if self._validatecache:
245 if not self._validatekey(filepath, 'write'):
245 if not self._validatekey(filepath, 'write'):
246 raise error.Abort(_("local cache write was corrupted %s") %
246 raise error.Abort(_("local cache write was corrupted %s") %
247 filepath)
247 filepath)
248 finally:
248 finally:
249 os.umask(oldumask)
249 os.umask(oldumask)
250
250
251 def markrepo(self, path):
251 def markrepo(self, path):
252 """Call this to add the given repo path to the store's list of
252 """Call this to add the given repo path to the store's list of
253 repositories that are using it. This is useful later when doing garbage
253 repositories that are using it. This is useful later when doing garbage
254 collection, since it allows us to insecpt the repos to see what nodes
254 collection, since it allows us to insecpt the repos to see what nodes
255 they want to be kept alive in the store.
255 they want to be kept alive in the store.
256 """
256 """
257 repospath = os.path.join(self._path, "repos")
257 repospath = os.path.join(self._path, "repos")
258 with open(repospath, 'a') as reposfile:
258 with open(repospath, 'ab') as reposfile:
259 reposfile.write(os.path.dirname(path) + "\n")
259 reposfile.write(os.path.dirname(path) + "\n")
260
260
261 repospathstat = os.stat(repospath)
261 repospathstat = os.stat(repospath)
262 if repospathstat.st_uid == self._uid:
262 if repospathstat.st_uid == self._uid:
263 os.chmod(repospath, 0o0664)
263 os.chmod(repospath, 0o0664)
264
264
265 def _validatekey(self, path, action):
265 def _validatekey(self, path, action):
266 with open(path, 'rb') as f:
266 with open(path, 'rb') as f:
267 data = f.read()
267 data = f.read()
268
268
269 if self._validatedata(data, path):
269 if self._validatedata(data, path):
270 return True
270 return True
271
271
272 if self._validatecachelog:
272 if self._validatecachelog:
273 with open(self._validatecachelog, 'a+') as f:
273 with open(self._validatecachelog, 'ab+') as f:
274 f.write("corrupt %s during %s\n" % (path, action))
274 f.write("corrupt %s during %s\n" % (path, action))
275
275
276 os.rename(path, path + ".corrupt")
276 os.rename(path, path + ".corrupt")
277 return False
277 return False
278
278
279 def _validatedata(self, data, path):
279 def _validatedata(self, data, path):
280 try:
280 try:
281 if len(data) > 0:
281 if len(data) > 0:
282 # see remotefilelogserver.createfileblob for the format
282 # see remotefilelogserver.createfileblob for the format
283 offset, size, flags = shallowutil.parsesizeflags(data)
283 offset, size, flags = shallowutil.parsesizeflags(data)
284 if len(data) <= size:
284 if len(data) <= size:
285 # it is truncated
285 # it is truncated
286 return False
286 return False
287
287
288 # extract the node from the metadata
288 # extract the node from the metadata
289 offset += size
289 offset += size
290 datanode = data[offset:offset + 20]
290 datanode = data[offset:offset + 20]
291
291
292 # and compare against the path
292 # and compare against the path
293 if os.path.basename(path) == hex(datanode):
293 if os.path.basename(path) == hex(datanode):
294 # Content matches the intended path
294 # Content matches the intended path
295 return True
295 return True
296 return False
296 return False
297 except (ValueError, RuntimeError):
297 except (ValueError, RuntimeError):
298 pass
298 pass
299
299
300 return False
300 return False
301
301
302 def gc(self, keepkeys):
302 def gc(self, keepkeys):
303 ui = self.ui
303 ui = self.ui
304 cachepath = self._path
304 cachepath = self._path
305 _removing = _("removing unnecessary files")
305 _removing = _("removing unnecessary files")
306 _truncating = _("enforcing cache limit")
306 _truncating = _("enforcing cache limit")
307
307
308 # prune cache
308 # prune cache
309 import Queue
309 import Queue
310 queue = Queue.PriorityQueue()
310 queue = Queue.PriorityQueue()
311 originalsize = 0
311 originalsize = 0
312 size = 0
312 size = 0
313 count = 0
313 count = 0
314 removed = 0
314 removed = 0
315
315
316 # keep files newer than a day even if they aren't needed
316 # keep files newer than a day even if they aren't needed
317 limit = time.time() - (60 * 60 * 24)
317 limit = time.time() - (60 * 60 * 24)
318
318
319 ui.progress(_removing, count, unit="files")
319 ui.progress(_removing, count, unit="files")
320 for root, dirs, files in os.walk(cachepath):
320 for root, dirs, files in os.walk(cachepath):
321 for file in files:
321 for file in files:
322 if file == 'repos':
322 if file == 'repos':
323 continue
323 continue
324
324
325 # Don't delete pack files
325 # Don't delete pack files
326 if '/packs/' in root:
326 if '/packs/' in root:
327 continue
327 continue
328
328
329 ui.progress(_removing, count, unit="files")
329 ui.progress(_removing, count, unit="files")
330 path = os.path.join(root, file)
330 path = os.path.join(root, file)
331 key = os.path.relpath(path, cachepath)
331 key = os.path.relpath(path, cachepath)
332 count += 1
332 count += 1
333 try:
333 try:
334 pathstat = os.stat(path)
334 pathstat = os.stat(path)
335 except OSError as e:
335 except OSError as e:
336 # errno.ENOENT = no such file or directory
336 # errno.ENOENT = no such file or directory
337 if e.errno != errno.ENOENT:
337 if e.errno != errno.ENOENT:
338 raise
338 raise
339 msg = _("warning: file %s was removed by another process\n")
339 msg = _("warning: file %s was removed by another process\n")
340 ui.warn(msg % path)
340 ui.warn(msg % path)
341 continue
341 continue
342
342
343 originalsize += pathstat.st_size
343 originalsize += pathstat.st_size
344
344
345 if key in keepkeys or pathstat.st_atime > limit:
345 if key in keepkeys or pathstat.st_atime > limit:
346 queue.put((pathstat.st_atime, path, pathstat))
346 queue.put((pathstat.st_atime, path, pathstat))
347 size += pathstat.st_size
347 size += pathstat.st_size
348 else:
348 else:
349 try:
349 try:
350 shallowutil.unlinkfile(path)
350 shallowutil.unlinkfile(path)
351 except OSError as e:
351 except OSError as e:
352 # errno.ENOENT = no such file or directory
352 # errno.ENOENT = no such file or directory
353 if e.errno != errno.ENOENT:
353 if e.errno != errno.ENOENT:
354 raise
354 raise
355 msg = _("warning: file %s was removed by another "
355 msg = _("warning: file %s was removed by another "
356 "process\n")
356 "process\n")
357 ui.warn(msg % path)
357 ui.warn(msg % path)
358 continue
358 continue
359 removed += 1
359 removed += 1
360 ui.progress(_removing, None)
360 ui.progress(_removing, None)
361
361
362 # remove oldest files until under limit
362 # remove oldest files until under limit
363 limit = ui.configbytes("remotefilelog", "cachelimit")
363 limit = ui.configbytes("remotefilelog", "cachelimit")
364 if size > limit:
364 if size > limit:
365 excess = size - limit
365 excess = size - limit
366 removedexcess = 0
366 removedexcess = 0
367 while queue and size > limit and size > 0:
367 while queue and size > limit and size > 0:
368 ui.progress(_truncating, removedexcess, unit="bytes",
368 ui.progress(_truncating, removedexcess, unit="bytes",
369 total=excess)
369 total=excess)
370 atime, oldpath, oldpathstat = queue.get()
370 atime, oldpath, oldpathstat = queue.get()
371 try:
371 try:
372 shallowutil.unlinkfile(oldpath)
372 shallowutil.unlinkfile(oldpath)
373 except OSError as e:
373 except OSError as e:
374 # errno.ENOENT = no such file or directory
374 # errno.ENOENT = no such file or directory
375 if e.errno != errno.ENOENT:
375 if e.errno != errno.ENOENT:
376 raise
376 raise
377 msg = _("warning: file %s was removed by another process\n")
377 msg = _("warning: file %s was removed by another process\n")
378 ui.warn(msg % oldpath)
378 ui.warn(msg % oldpath)
379 size -= oldpathstat.st_size
379 size -= oldpathstat.st_size
380 removed += 1
380 removed += 1
381 removedexcess += oldpathstat.st_size
381 removedexcess += oldpathstat.st_size
382 ui.progress(_truncating, None)
382 ui.progress(_truncating, None)
383
383
384 ui.status(_("finished: removed %s of %s files (%0.2f GB to %0.2f GB)\n")
384 ui.status(_("finished: removed %s of %s files (%0.2f GB to %0.2f GB)\n")
385 % (removed, count,
385 % (removed, count,
386 float(originalsize) / 1024.0 / 1024.0 / 1024.0,
386 float(originalsize) / 1024.0 / 1024.0 / 1024.0,
387 float(size) / 1024.0 / 1024.0 / 1024.0))
387 float(size) / 1024.0 / 1024.0 / 1024.0))
388
388
389 class baseunionstore(object):
389 class baseunionstore(object):
390 def __init__(self, *args, **kwargs):
390 def __init__(self, *args, **kwargs):
391 # If one of the functions that iterates all of the stores is about to
391 # If one of the functions that iterates all of the stores is about to
392 # throw a KeyError, try this many times with a full refresh between
392 # throw a KeyError, try this many times with a full refresh between
393 # attempts. A repack operation may have moved data from one store to
393 # attempts. A repack operation may have moved data from one store to
394 # another while we were running.
394 # another while we were running.
395 self.numattempts = kwargs.get(r'numretries', 0) + 1
395 self.numattempts = kwargs.get(r'numretries', 0) + 1
396 # If not-None, call this function on every retry and if the attempts are
396 # If not-None, call this function on every retry and if the attempts are
397 # exhausted.
397 # exhausted.
398 self.retrylog = kwargs.get(r'retrylog', None)
398 self.retrylog = kwargs.get(r'retrylog', None)
399
399
400 def markforrefresh(self):
400 def markforrefresh(self):
401 for store in self.stores:
401 for store in self.stores:
402 if util.safehasattr(store, 'markforrefresh'):
402 if util.safehasattr(store, 'markforrefresh'):
403 store.markforrefresh()
403 store.markforrefresh()
404
404
405 @staticmethod
405 @staticmethod
406 def retriable(fn):
406 def retriable(fn):
407 def noop(*args):
407 def noop(*args):
408 pass
408 pass
409 def wrapped(self, *args, **kwargs):
409 def wrapped(self, *args, **kwargs):
410 retrylog = self.retrylog or noop
410 retrylog = self.retrylog or noop
411 funcname = fn.__name__
411 funcname = fn.__name__
412 for i in pycompat.xrange(self.numattempts):
412 for i in pycompat.xrange(self.numattempts):
413 if i > 0:
413 if i > 0:
414 retrylog('re-attempting (n=%d) %s\n' % (i, funcname))
414 retrylog('re-attempting (n=%d) %s\n' % (i, funcname))
415 self.markforrefresh()
415 self.markforrefresh()
416 try:
416 try:
417 return fn(self, *args, **kwargs)
417 return fn(self, *args, **kwargs)
418 except KeyError:
418 except KeyError:
419 pass
419 pass
420 # retries exhausted
420 # retries exhausted
421 retrylog('retries exhausted in %s, raising KeyError\n' % funcname)
421 retrylog('retries exhausted in %s, raising KeyError\n' % funcname)
422 raise
422 raise
423 return wrapped
423 return wrapped
@@ -1,378 +1,378
1 # debugcommands.py - debug logic for remotefilelog
1 # debugcommands.py - debug logic for remotefilelog
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import hashlib
9 import hashlib
10 import os
10 import os
11 import zlib
11 import zlib
12
12
13 from mercurial.node import bin, hex, nullid, short
13 from mercurial.node import bin, hex, nullid, short
14 from mercurial.i18n import _
14 from mercurial.i18n import _
15 from mercurial import (
15 from mercurial import (
16 error,
16 error,
17 filelog,
17 filelog,
18 node as nodemod,
18 node as nodemod,
19 revlog,
19 revlog,
20 )
20 )
21 from . import (
21 from . import (
22 constants,
22 constants,
23 datapack,
23 datapack,
24 extutil,
24 extutil,
25 fileserverclient,
25 fileserverclient,
26 historypack,
26 historypack,
27 repack,
27 repack,
28 shallowutil,
28 shallowutil,
29 )
29 )
30
30
31 def debugremotefilelog(ui, path, **opts):
31 def debugremotefilelog(ui, path, **opts):
32 decompress = opts.get(r'decompress')
32 decompress = opts.get(r'decompress')
33
33
34 size, firstnode, mapping = parsefileblob(path, decompress)
34 size, firstnode, mapping = parsefileblob(path, decompress)
35
35
36 ui.status(_("size: %s bytes\n") % (size))
36 ui.status(_("size: %s bytes\n") % (size))
37 ui.status(_("path: %s \n") % (path))
37 ui.status(_("path: %s \n") % (path))
38 ui.status(_("key: %s \n") % (short(firstnode)))
38 ui.status(_("key: %s \n") % (short(firstnode)))
39 ui.status(_("\n"))
39 ui.status(_("\n"))
40 ui.status(_("%12s => %12s %13s %13s %12s\n") %
40 ui.status(_("%12s => %12s %13s %13s %12s\n") %
41 ("node", "p1", "p2", "linknode", "copyfrom"))
41 ("node", "p1", "p2", "linknode", "copyfrom"))
42
42
43 queue = [firstnode]
43 queue = [firstnode]
44 while queue:
44 while queue:
45 node = queue.pop(0)
45 node = queue.pop(0)
46 p1, p2, linknode, copyfrom = mapping[node]
46 p1, p2, linknode, copyfrom = mapping[node]
47 ui.status(_("%s => %s %s %s %s\n") %
47 ui.status(_("%s => %s %s %s %s\n") %
48 (short(node), short(p1), short(p2), short(linknode), copyfrom))
48 (short(node), short(p1), short(p2), short(linknode), copyfrom))
49 if p1 != nullid:
49 if p1 != nullid:
50 queue.append(p1)
50 queue.append(p1)
51 if p2 != nullid:
51 if p2 != nullid:
52 queue.append(p2)
52 queue.append(p2)
53
53
54 def buildtemprevlog(repo, file):
54 def buildtemprevlog(repo, file):
55 # get filename key
55 # get filename key
56 filekey = nodemod.hex(hashlib.sha1(file).digest())
56 filekey = nodemod.hex(hashlib.sha1(file).digest())
57 filedir = os.path.join(repo.path, 'store/data', filekey)
57 filedir = os.path.join(repo.path, 'store/data', filekey)
58
58
59 # sort all entries based on linkrev
59 # sort all entries based on linkrev
60 fctxs = []
60 fctxs = []
61 for filenode in os.listdir(filedir):
61 for filenode in os.listdir(filedir):
62 if '_old' not in filenode:
62 if '_old' not in filenode:
63 fctxs.append(repo.filectx(file, fileid=bin(filenode)))
63 fctxs.append(repo.filectx(file, fileid=bin(filenode)))
64
64
65 fctxs = sorted(fctxs, key=lambda x: x.linkrev())
65 fctxs = sorted(fctxs, key=lambda x: x.linkrev())
66
66
67 # add to revlog
67 # add to revlog
68 temppath = repo.sjoin('data/temprevlog.i')
68 temppath = repo.sjoin('data/temprevlog.i')
69 if os.path.exists(temppath):
69 if os.path.exists(temppath):
70 os.remove(temppath)
70 os.remove(temppath)
71 r = filelog.filelog(repo.svfs, 'temprevlog')
71 r = filelog.filelog(repo.svfs, 'temprevlog')
72
72
73 class faket(object):
73 class faket(object):
74 def add(self, a, b, c):
74 def add(self, a, b, c):
75 pass
75 pass
76 t = faket()
76 t = faket()
77 for fctx in fctxs:
77 for fctx in fctxs:
78 if fctx.node() not in repo:
78 if fctx.node() not in repo:
79 continue
79 continue
80
80
81 p = fctx.filelog().parents(fctx.filenode())
81 p = fctx.filelog().parents(fctx.filenode())
82 meta = {}
82 meta = {}
83 if fctx.renamed():
83 if fctx.renamed():
84 meta['copy'] = fctx.renamed()[0]
84 meta['copy'] = fctx.renamed()[0]
85 meta['copyrev'] = hex(fctx.renamed()[1])
85 meta['copyrev'] = hex(fctx.renamed()[1])
86
86
87 r.add(fctx.data(), meta, t, fctx.linkrev(), p[0], p[1])
87 r.add(fctx.data(), meta, t, fctx.linkrev(), p[0], p[1])
88
88
89 return r
89 return r
90
90
91 def debugindex(orig, ui, repo, file_=None, **opts):
91 def debugindex(orig, ui, repo, file_=None, **opts):
92 """dump the contents of an index file"""
92 """dump the contents of an index file"""
93 if (opts.get(r'changelog') or
93 if (opts.get(r'changelog') or
94 opts.get(r'manifest') or
94 opts.get(r'manifest') or
95 opts.get(r'dir') or
95 opts.get(r'dir') or
96 not shallowutil.isenabled(repo) or
96 not shallowutil.isenabled(repo) or
97 not repo.shallowmatch(file_)):
97 not repo.shallowmatch(file_)):
98 return orig(ui, repo, file_, **opts)
98 return orig(ui, repo, file_, **opts)
99
99
100 r = buildtemprevlog(repo, file_)
100 r = buildtemprevlog(repo, file_)
101
101
102 # debugindex like normal
102 # debugindex like normal
103 format = opts.get('format', 0)
103 format = opts.get('format', 0)
104 if format not in (0, 1):
104 if format not in (0, 1):
105 raise error.Abort(_("unknown format %d") % format)
105 raise error.Abort(_("unknown format %d") % format)
106
106
107 generaldelta = r.version & revlog.FLAG_GENERALDELTA
107 generaldelta = r.version & revlog.FLAG_GENERALDELTA
108 if generaldelta:
108 if generaldelta:
109 basehdr = ' delta'
109 basehdr = ' delta'
110 else:
110 else:
111 basehdr = ' base'
111 basehdr = ' base'
112
112
113 if format == 0:
113 if format == 0:
114 ui.write((" rev offset length " + basehdr + " linkrev"
114 ui.write((" rev offset length " + basehdr + " linkrev"
115 " nodeid p1 p2\n"))
115 " nodeid p1 p2\n"))
116 elif format == 1:
116 elif format == 1:
117 ui.write((" rev flag offset length"
117 ui.write((" rev flag offset length"
118 " size " + basehdr + " link p1 p2"
118 " size " + basehdr + " link p1 p2"
119 " nodeid\n"))
119 " nodeid\n"))
120
120
121 for i in r:
121 for i in r:
122 node = r.node(i)
122 node = r.node(i)
123 if generaldelta:
123 if generaldelta:
124 base = r.deltaparent(i)
124 base = r.deltaparent(i)
125 else:
125 else:
126 base = r.chainbase(i)
126 base = r.chainbase(i)
127 if format == 0:
127 if format == 0:
128 try:
128 try:
129 pp = r.parents(node)
129 pp = r.parents(node)
130 except Exception:
130 except Exception:
131 pp = [nullid, nullid]
131 pp = [nullid, nullid]
132 ui.write("% 6d % 9d % 7d % 6d % 7d %s %s %s\n" % (
132 ui.write("% 6d % 9d % 7d % 6d % 7d %s %s %s\n" % (
133 i, r.start(i), r.length(i), base, r.linkrev(i),
133 i, r.start(i), r.length(i), base, r.linkrev(i),
134 short(node), short(pp[0]), short(pp[1])))
134 short(node), short(pp[0]), short(pp[1])))
135 elif format == 1:
135 elif format == 1:
136 pr = r.parentrevs(i)
136 pr = r.parentrevs(i)
137 ui.write("% 6d %04x % 8d % 8d % 8d % 6d % 6d % 6d % 6d %s\n" % (
137 ui.write("% 6d %04x % 8d % 8d % 8d % 6d % 6d % 6d % 6d %s\n" % (
138 i, r.flags(i), r.start(i), r.length(i), r.rawsize(i),
138 i, r.flags(i), r.start(i), r.length(i), r.rawsize(i),
139 base, r.linkrev(i), pr[0], pr[1], short(node)))
139 base, r.linkrev(i), pr[0], pr[1], short(node)))
140
140
141 def debugindexdot(orig, ui, repo, file_):
141 def debugindexdot(orig, ui, repo, file_):
142 """dump an index DAG as a graphviz dot file"""
142 """dump an index DAG as a graphviz dot file"""
143 if not shallowutil.isenabled(repo):
143 if not shallowutil.isenabled(repo):
144 return orig(ui, repo, file_)
144 return orig(ui, repo, file_)
145
145
146 r = buildtemprevlog(repo, os.path.basename(file_)[:-2])
146 r = buildtemprevlog(repo, os.path.basename(file_)[:-2])
147
147
148 ui.write(("digraph G {\n"))
148 ui.write(("digraph G {\n"))
149 for i in r:
149 for i in r:
150 node = r.node(i)
150 node = r.node(i)
151 pp = r.parents(node)
151 pp = r.parents(node)
152 ui.write("\t%d -> %d\n" % (r.rev(pp[0]), i))
152 ui.write("\t%d -> %d\n" % (r.rev(pp[0]), i))
153 if pp[1] != nullid:
153 if pp[1] != nullid:
154 ui.write("\t%d -> %d\n" % (r.rev(pp[1]), i))
154 ui.write("\t%d -> %d\n" % (r.rev(pp[1]), i))
155 ui.write("}\n")
155 ui.write("}\n")
156
156
157 def verifyremotefilelog(ui, path, **opts):
157 def verifyremotefilelog(ui, path, **opts):
158 decompress = opts.get(r'decompress')
158 decompress = opts.get(r'decompress')
159
159
160 for root, dirs, files in os.walk(path):
160 for root, dirs, files in os.walk(path):
161 for file in files:
161 for file in files:
162 if file == "repos":
162 if file == "repos":
163 continue
163 continue
164 filepath = os.path.join(root, file)
164 filepath = os.path.join(root, file)
165 size, firstnode, mapping = parsefileblob(filepath, decompress)
165 size, firstnode, mapping = parsefileblob(filepath, decompress)
166 for p1, p2, linknode, copyfrom in mapping.itervalues():
166 for p1, p2, linknode, copyfrom in mapping.itervalues():
167 if linknode == nullid:
167 if linknode == nullid:
168 actualpath = os.path.relpath(root, path)
168 actualpath = os.path.relpath(root, path)
169 key = fileserverclient.getcachekey("reponame", actualpath,
169 key = fileserverclient.getcachekey("reponame", actualpath,
170 file)
170 file)
171 ui.status("%s %s\n" % (key, os.path.relpath(filepath,
171 ui.status("%s %s\n" % (key, os.path.relpath(filepath,
172 path)))
172 path)))
173
173
174 def _decompressblob(raw):
174 def _decompressblob(raw):
175 return zlib.decompress(raw)
175 return zlib.decompress(raw)
176
176
177 def parsefileblob(path, decompress):
177 def parsefileblob(path, decompress):
178 raw = None
178 raw = None
179 f = open(path, "r")
179 f = open(path, "rb")
180 try:
180 try:
181 raw = f.read()
181 raw = f.read()
182 finally:
182 finally:
183 f.close()
183 f.close()
184
184
185 if decompress:
185 if decompress:
186 raw = _decompressblob(raw)
186 raw = _decompressblob(raw)
187
187
188 offset, size, flags = shallowutil.parsesizeflags(raw)
188 offset, size, flags = shallowutil.parsesizeflags(raw)
189 start = offset + size
189 start = offset + size
190
190
191 firstnode = None
191 firstnode = None
192
192
193 mapping = {}
193 mapping = {}
194 while start < len(raw):
194 while start < len(raw):
195 divider = raw.index('\0', start + 80)
195 divider = raw.index('\0', start + 80)
196
196
197 currentnode = raw[start:(start + 20)]
197 currentnode = raw[start:(start + 20)]
198 if not firstnode:
198 if not firstnode:
199 firstnode = currentnode
199 firstnode = currentnode
200
200
201 p1 = raw[(start + 20):(start + 40)]
201 p1 = raw[(start + 20):(start + 40)]
202 p2 = raw[(start + 40):(start + 60)]
202 p2 = raw[(start + 40):(start + 60)]
203 linknode = raw[(start + 60):(start + 80)]
203 linknode = raw[(start + 60):(start + 80)]
204 copyfrom = raw[(start + 80):divider]
204 copyfrom = raw[(start + 80):divider]
205
205
206 mapping[currentnode] = (p1, p2, linknode, copyfrom)
206 mapping[currentnode] = (p1, p2, linknode, copyfrom)
207 start = divider + 1
207 start = divider + 1
208
208
209 return size, firstnode, mapping
209 return size, firstnode, mapping
210
210
211 def debugdatapack(ui, *paths, **opts):
211 def debugdatapack(ui, *paths, **opts):
212 for path in paths:
212 for path in paths:
213 if '.data' in path:
213 if '.data' in path:
214 path = path[:path.index('.data')]
214 path = path[:path.index('.data')]
215 ui.write("%s:\n" % path)
215 ui.write("%s:\n" % path)
216 dpack = datapack.datapack(path)
216 dpack = datapack.datapack(path)
217 node = opts.get(r'node')
217 node = opts.get(r'node')
218 if node:
218 if node:
219 deltachain = dpack.getdeltachain('', bin(node))
219 deltachain = dpack.getdeltachain('', bin(node))
220 dumpdeltachain(ui, deltachain, **opts)
220 dumpdeltachain(ui, deltachain, **opts)
221 return
221 return
222
222
223 if opts.get(r'long'):
223 if opts.get(r'long'):
224 hashformatter = hex
224 hashformatter = hex
225 hashlen = 42
225 hashlen = 42
226 else:
226 else:
227 hashformatter = short
227 hashformatter = short
228 hashlen = 14
228 hashlen = 14
229
229
230 lastfilename = None
230 lastfilename = None
231 totaldeltasize = 0
231 totaldeltasize = 0
232 totalblobsize = 0
232 totalblobsize = 0
233 def printtotals():
233 def printtotals():
234 if lastfilename is not None:
234 if lastfilename is not None:
235 ui.write("\n")
235 ui.write("\n")
236 if not totaldeltasize or not totalblobsize:
236 if not totaldeltasize or not totalblobsize:
237 return
237 return
238 difference = totalblobsize - totaldeltasize
238 difference = totalblobsize - totaldeltasize
239 deltastr = "%0.1f%% %s" % (
239 deltastr = "%0.1f%% %s" % (
240 (100.0 * abs(difference) / totalblobsize),
240 (100.0 * abs(difference) / totalblobsize),
241 ("smaller" if difference > 0 else "bigger"))
241 ("smaller" if difference > 0 else "bigger"))
242
242
243 ui.write(("Total:%s%s %s (%s)\n") % (
243 ui.write(("Total:%s%s %s (%s)\n") % (
244 "".ljust(2 * hashlen - len("Total:")),
244 "".ljust(2 * hashlen - len("Total:")),
245 str(totaldeltasize).ljust(12),
245 str(totaldeltasize).ljust(12),
246 str(totalblobsize).ljust(9),
246 str(totalblobsize).ljust(9),
247 deltastr
247 deltastr
248 ))
248 ))
249
249
250 bases = {}
250 bases = {}
251 nodes = set()
251 nodes = set()
252 failures = 0
252 failures = 0
253 for filename, node, deltabase, deltalen in dpack.iterentries():
253 for filename, node, deltabase, deltalen in dpack.iterentries():
254 bases[node] = deltabase
254 bases[node] = deltabase
255 if node in nodes:
255 if node in nodes:
256 ui.write(("Bad entry: %s appears twice\n" % short(node)))
256 ui.write(("Bad entry: %s appears twice\n" % short(node)))
257 failures += 1
257 failures += 1
258 nodes.add(node)
258 nodes.add(node)
259 if filename != lastfilename:
259 if filename != lastfilename:
260 printtotals()
260 printtotals()
261 name = '(empty name)' if filename == '' else filename
261 name = '(empty name)' if filename == '' else filename
262 ui.write("%s:\n" % name)
262 ui.write("%s:\n" % name)
263 ui.write("%s%s%s%s\n" % (
263 ui.write("%s%s%s%s\n" % (
264 "Node".ljust(hashlen),
264 "Node".ljust(hashlen),
265 "Delta Base".ljust(hashlen),
265 "Delta Base".ljust(hashlen),
266 "Delta Length".ljust(14),
266 "Delta Length".ljust(14),
267 "Blob Size".ljust(9)))
267 "Blob Size".ljust(9)))
268 lastfilename = filename
268 lastfilename = filename
269 totalblobsize = 0
269 totalblobsize = 0
270 totaldeltasize = 0
270 totaldeltasize = 0
271
271
272 # Metadata could be missing, in which case it will be an empty dict.
272 # Metadata could be missing, in which case it will be an empty dict.
273 meta = dpack.getmeta(filename, node)
273 meta = dpack.getmeta(filename, node)
274 if constants.METAKEYSIZE in meta:
274 if constants.METAKEYSIZE in meta:
275 blobsize = meta[constants.METAKEYSIZE]
275 blobsize = meta[constants.METAKEYSIZE]
276 totaldeltasize += deltalen
276 totaldeltasize += deltalen
277 totalblobsize += blobsize
277 totalblobsize += blobsize
278 else:
278 else:
279 blobsize = "(missing)"
279 blobsize = "(missing)"
280 ui.write("%s %s %s%s\n" % (
280 ui.write("%s %s %s%s\n" % (
281 hashformatter(node),
281 hashformatter(node),
282 hashformatter(deltabase),
282 hashformatter(deltabase),
283 str(deltalen).ljust(14),
283 str(deltalen).ljust(14),
284 blobsize))
284 blobsize))
285
285
286 if filename is not None:
286 if filename is not None:
287 printtotals()
287 printtotals()
288
288
289 failures += _sanitycheck(ui, set(nodes), bases)
289 failures += _sanitycheck(ui, set(nodes), bases)
290 if failures > 1:
290 if failures > 1:
291 ui.warn(("%d failures\n" % failures))
291 ui.warn(("%d failures\n" % failures))
292 return 1
292 return 1
293
293
294 def _sanitycheck(ui, nodes, bases):
294 def _sanitycheck(ui, nodes, bases):
295 """
295 """
296 Does some basic sanity checking on a packfiles with ``nodes`` ``bases`` (a
296 Does some basic sanity checking on a packfiles with ``nodes`` ``bases`` (a
297 mapping of node->base):
297 mapping of node->base):
298
298
299 - Each deltabase must itself be a node elsewhere in the pack
299 - Each deltabase must itself be a node elsewhere in the pack
300 - There must be no cycles
300 - There must be no cycles
301 """
301 """
302 failures = 0
302 failures = 0
303 for node in nodes:
303 for node in nodes:
304 seen = set()
304 seen = set()
305 current = node
305 current = node
306 deltabase = bases[current]
306 deltabase = bases[current]
307
307
308 while deltabase != nullid:
308 while deltabase != nullid:
309 if deltabase not in nodes:
309 if deltabase not in nodes:
310 ui.warn(("Bad entry: %s has an unknown deltabase (%s)\n" %
310 ui.warn(("Bad entry: %s has an unknown deltabase (%s)\n" %
311 (short(node), short(deltabase))))
311 (short(node), short(deltabase))))
312 failures += 1
312 failures += 1
313 break
313 break
314
314
315 if deltabase in seen:
315 if deltabase in seen:
316 ui.warn(("Bad entry: %s has a cycle (at %s)\n" %
316 ui.warn(("Bad entry: %s has a cycle (at %s)\n" %
317 (short(node), short(deltabase))))
317 (short(node), short(deltabase))))
318 failures += 1
318 failures += 1
319 break
319 break
320
320
321 current = deltabase
321 current = deltabase
322 seen.add(current)
322 seen.add(current)
323 deltabase = bases[current]
323 deltabase = bases[current]
324 # Since ``node`` begins a valid chain, reset/memoize its base to nullid
324 # Since ``node`` begins a valid chain, reset/memoize its base to nullid
325 # so we don't traverse it again.
325 # so we don't traverse it again.
326 bases[node] = nullid
326 bases[node] = nullid
327 return failures
327 return failures
328
328
329 def dumpdeltachain(ui, deltachain, **opts):
329 def dumpdeltachain(ui, deltachain, **opts):
330 hashformatter = hex
330 hashformatter = hex
331 hashlen = 40
331 hashlen = 40
332
332
333 lastfilename = None
333 lastfilename = None
334 for filename, node, filename, deltabasenode, delta in deltachain:
334 for filename, node, filename, deltabasenode, delta in deltachain:
335 if filename != lastfilename:
335 if filename != lastfilename:
336 ui.write("\n%s\n" % filename)
336 ui.write("\n%s\n" % filename)
337 lastfilename = filename
337 lastfilename = filename
338 ui.write("%s %s %s %s\n" % (
338 ui.write("%s %s %s %s\n" % (
339 "Node".ljust(hashlen),
339 "Node".ljust(hashlen),
340 "Delta Base".ljust(hashlen),
340 "Delta Base".ljust(hashlen),
341 "Delta SHA1".ljust(hashlen),
341 "Delta SHA1".ljust(hashlen),
342 "Delta Length".ljust(6),
342 "Delta Length".ljust(6),
343 ))
343 ))
344
344
345 ui.write("%s %s %s %s\n" % (
345 ui.write("%s %s %s %s\n" % (
346 hashformatter(node),
346 hashformatter(node),
347 hashformatter(deltabasenode),
347 hashformatter(deltabasenode),
348 nodemod.hex(hashlib.sha1(delta).digest()),
348 nodemod.hex(hashlib.sha1(delta).digest()),
349 len(delta)))
349 len(delta)))
350
350
351 def debughistorypack(ui, path):
351 def debughistorypack(ui, path):
352 if '.hist' in path:
352 if '.hist' in path:
353 path = path[:path.index('.hist')]
353 path = path[:path.index('.hist')]
354 hpack = historypack.historypack(path)
354 hpack = historypack.historypack(path)
355
355
356 lastfilename = None
356 lastfilename = None
357 for entry in hpack.iterentries():
357 for entry in hpack.iterentries():
358 filename, node, p1node, p2node, linknode, copyfrom = entry
358 filename, node, p1node, p2node, linknode, copyfrom = entry
359 if filename != lastfilename:
359 if filename != lastfilename:
360 ui.write("\n%s\n" % filename)
360 ui.write("\n%s\n" % filename)
361 ui.write("%s%s%s%s%s\n" % (
361 ui.write("%s%s%s%s%s\n" % (
362 "Node".ljust(14),
362 "Node".ljust(14),
363 "P1 Node".ljust(14),
363 "P1 Node".ljust(14),
364 "P2 Node".ljust(14),
364 "P2 Node".ljust(14),
365 "Link Node".ljust(14),
365 "Link Node".ljust(14),
366 "Copy From"))
366 "Copy From"))
367 lastfilename = filename
367 lastfilename = filename
368 ui.write("%s %s %s %s %s\n" % (short(node), short(p1node),
368 ui.write("%s %s %s %s %s\n" % (short(node), short(p1node),
369 short(p2node), short(linknode), copyfrom))
369 short(p2node), short(linknode), copyfrom))
370
370
371 def debugwaitonrepack(repo):
371 def debugwaitonrepack(repo):
372 with extutil.flock(repack.repacklockvfs(repo).join('repacklock'), ''):
372 with extutil.flock(repack.repacklockvfs(repo).join('repacklock'), ''):
373 return
373 return
374
374
375 def debugwaitonprefetch(repo):
375 def debugwaitonprefetch(repo):
376 with repo._lock(repo.svfs, "prefetchlock", True, None,
376 with repo._lock(repo.svfs, "prefetchlock", True, None,
377 None, _('prefetching in %s') % repo.origroot):
377 None, _('prefetching in %s') % repo.origroot):
378 pass
378 pass
@@ -1,589 +1,589
1 # fileserverclient.py - client for communicating with the cache process
1 # fileserverclient.py - client for communicating with the cache process
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import io
11 import io
12 import os
12 import os
13 import threading
13 import threading
14 import time
14 import time
15 import zlib
15 import zlib
16
16
17 from mercurial.i18n import _
17 from mercurial.i18n import _
18 from mercurial.node import bin, hex, nullid
18 from mercurial.node import bin, hex, nullid
19 from mercurial import (
19 from mercurial import (
20 error,
20 error,
21 node,
21 node,
22 pycompat,
22 pycompat,
23 revlog,
23 revlog,
24 sshpeer,
24 sshpeer,
25 util,
25 util,
26 wireprotov1peer,
26 wireprotov1peer,
27 )
27 )
28 from mercurial.utils import procutil
28 from mercurial.utils import procutil
29
29
30 from . import (
30 from . import (
31 constants,
31 constants,
32 contentstore,
32 contentstore,
33 metadatastore,
33 metadatastore,
34 )
34 )
35
35
36 _sshv1peer = sshpeer.sshv1peer
36 _sshv1peer = sshpeer.sshv1peer
37
37
38 # Statistics for debugging
38 # Statistics for debugging
39 fetchcost = 0
39 fetchcost = 0
40 fetches = 0
40 fetches = 0
41 fetched = 0
41 fetched = 0
42 fetchmisses = 0
42 fetchmisses = 0
43
43
44 _lfsmod = None
44 _lfsmod = None
45 _downloading = _('downloading')
45 _downloading = _('downloading')
46
46
47 def getcachekey(reponame, file, id):
47 def getcachekey(reponame, file, id):
48 pathhash = node.hex(hashlib.sha1(file).digest())
48 pathhash = node.hex(hashlib.sha1(file).digest())
49 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
49 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
50
50
51 def getlocalkey(file, id):
51 def getlocalkey(file, id):
52 pathhash = node.hex(hashlib.sha1(file).digest())
52 pathhash = node.hex(hashlib.sha1(file).digest())
53 return os.path.join(pathhash, id)
53 return os.path.join(pathhash, id)
54
54
55 def peersetup(ui, peer):
55 def peersetup(ui, peer):
56
56
57 class remotefilepeer(peer.__class__):
57 class remotefilepeer(peer.__class__):
58 @wireprotov1peer.batchable
58 @wireprotov1peer.batchable
59 def x_rfl_getfile(self, file, node):
59 def x_rfl_getfile(self, file, node):
60 if not self.capable('x_rfl_getfile'):
60 if not self.capable('x_rfl_getfile'):
61 raise error.Abort(
61 raise error.Abort(
62 'configured remotefile server does not support getfile')
62 'configured remotefile server does not support getfile')
63 f = wireprotov1peer.future()
63 f = wireprotov1peer.future()
64 yield {'file': file, 'node': node}, f
64 yield {'file': file, 'node': node}, f
65 code, data = f.value.split('\0', 1)
65 code, data = f.value.split('\0', 1)
66 if int(code):
66 if int(code):
67 raise error.LookupError(file, node, data)
67 raise error.LookupError(file, node, data)
68 yield data
68 yield data
69
69
70 @wireprotov1peer.batchable
70 @wireprotov1peer.batchable
71 def x_rfl_getflogheads(self, path):
71 def x_rfl_getflogheads(self, path):
72 if not self.capable('x_rfl_getflogheads'):
72 if not self.capable('x_rfl_getflogheads'):
73 raise error.Abort('configured remotefile server does not '
73 raise error.Abort('configured remotefile server does not '
74 'support getflogheads')
74 'support getflogheads')
75 f = wireprotov1peer.future()
75 f = wireprotov1peer.future()
76 yield {'path': path}, f
76 yield {'path': path}, f
77 heads = f.value.split('\n') if f.value else []
77 heads = f.value.split('\n') if f.value else []
78 yield heads
78 yield heads
79
79
80 def _updatecallstreamopts(self, command, opts):
80 def _updatecallstreamopts(self, command, opts):
81 if command != 'getbundle':
81 if command != 'getbundle':
82 return
82 return
83 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
83 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
84 not in self.capabilities()):
84 not in self.capabilities()):
85 return
85 return
86 if not util.safehasattr(self, '_localrepo'):
86 if not util.safehasattr(self, '_localrepo'):
87 return
87 return
88 if (constants.SHALLOWREPO_REQUIREMENT
88 if (constants.SHALLOWREPO_REQUIREMENT
89 not in self._localrepo.requirements):
89 not in self._localrepo.requirements):
90 return
90 return
91
91
92 bundlecaps = opts.get('bundlecaps')
92 bundlecaps = opts.get('bundlecaps')
93 if bundlecaps:
93 if bundlecaps:
94 bundlecaps = [bundlecaps]
94 bundlecaps = [bundlecaps]
95 else:
95 else:
96 bundlecaps = []
96 bundlecaps = []
97
97
98 # shallow, includepattern, and excludepattern are a hacky way of
98 # shallow, includepattern, and excludepattern are a hacky way of
99 # carrying over data from the local repo to this getbundle
99 # carrying over data from the local repo to this getbundle
100 # command. We need to do it this way because bundle1 getbundle
100 # command. We need to do it this way because bundle1 getbundle
101 # doesn't provide any other place we can hook in to manipulate
101 # doesn't provide any other place we can hook in to manipulate
102 # getbundle args before it goes across the wire. Once we get rid
102 # getbundle args before it goes across the wire. Once we get rid
103 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
103 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
104 # do this more cleanly.
104 # do this more cleanly.
105 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
105 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
106 if self._localrepo.includepattern:
106 if self._localrepo.includepattern:
107 patterns = '\0'.join(self._localrepo.includepattern)
107 patterns = '\0'.join(self._localrepo.includepattern)
108 includecap = "includepattern=" + patterns
108 includecap = "includepattern=" + patterns
109 bundlecaps.append(includecap)
109 bundlecaps.append(includecap)
110 if self._localrepo.excludepattern:
110 if self._localrepo.excludepattern:
111 patterns = '\0'.join(self._localrepo.excludepattern)
111 patterns = '\0'.join(self._localrepo.excludepattern)
112 excludecap = "excludepattern=" + patterns
112 excludecap = "excludepattern=" + patterns
113 bundlecaps.append(excludecap)
113 bundlecaps.append(excludecap)
114 opts['bundlecaps'] = ','.join(bundlecaps)
114 opts['bundlecaps'] = ','.join(bundlecaps)
115
115
116 def _sendrequest(self, command, args, **opts):
116 def _sendrequest(self, command, args, **opts):
117 self._updatecallstreamopts(command, args)
117 self._updatecallstreamopts(command, args)
118 return super(remotefilepeer, self)._sendrequest(command, args,
118 return super(remotefilepeer, self)._sendrequest(command, args,
119 **opts)
119 **opts)
120
120
121 def _callstream(self, command, **opts):
121 def _callstream(self, command, **opts):
122 supertype = super(remotefilepeer, self)
122 supertype = super(remotefilepeer, self)
123 if not util.safehasattr(supertype, '_sendrequest'):
123 if not util.safehasattr(supertype, '_sendrequest'):
124 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
124 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
125 return super(remotefilepeer, self)._callstream(command, **opts)
125 return super(remotefilepeer, self)._callstream(command, **opts)
126
126
127 peer.__class__ = remotefilepeer
127 peer.__class__ = remotefilepeer
128
128
129 class cacheconnection(object):
129 class cacheconnection(object):
130 """The connection for communicating with the remote cache. Performs
130 """The connection for communicating with the remote cache. Performs
131 gets and sets by communicating with an external process that has the
131 gets and sets by communicating with an external process that has the
132 cache-specific implementation.
132 cache-specific implementation.
133 """
133 """
134 def __init__(self):
134 def __init__(self):
135 self.pipeo = self.pipei = self.pipee = None
135 self.pipeo = self.pipei = self.pipee = None
136 self.subprocess = None
136 self.subprocess = None
137 self.connected = False
137 self.connected = False
138
138
139 def connect(self, cachecommand):
139 def connect(self, cachecommand):
140 if self.pipeo:
140 if self.pipeo:
141 raise error.Abort(_("cache connection already open"))
141 raise error.Abort(_("cache connection already open"))
142 self.pipei, self.pipeo, self.pipee, self.subprocess = \
142 self.pipei, self.pipeo, self.pipee, self.subprocess = \
143 procutil.popen4(cachecommand)
143 procutil.popen4(cachecommand)
144 self.connected = True
144 self.connected = True
145
145
146 def close(self):
146 def close(self):
147 def tryclose(pipe):
147 def tryclose(pipe):
148 try:
148 try:
149 pipe.close()
149 pipe.close()
150 except Exception:
150 except Exception:
151 pass
151 pass
152 if self.connected:
152 if self.connected:
153 try:
153 try:
154 self.pipei.write("exit\n")
154 self.pipei.write("exit\n")
155 except Exception:
155 except Exception:
156 pass
156 pass
157 tryclose(self.pipei)
157 tryclose(self.pipei)
158 self.pipei = None
158 self.pipei = None
159 tryclose(self.pipeo)
159 tryclose(self.pipeo)
160 self.pipeo = None
160 self.pipeo = None
161 tryclose(self.pipee)
161 tryclose(self.pipee)
162 self.pipee = None
162 self.pipee = None
163 try:
163 try:
164 # Wait for process to terminate, making sure to avoid deadlock.
164 # Wait for process to terminate, making sure to avoid deadlock.
165 # See https://docs.python.org/2/library/subprocess.html for
165 # See https://docs.python.org/2/library/subprocess.html for
166 # warnings about wait() and deadlocking.
166 # warnings about wait() and deadlocking.
167 self.subprocess.communicate()
167 self.subprocess.communicate()
168 except Exception:
168 except Exception:
169 pass
169 pass
170 self.subprocess = None
170 self.subprocess = None
171 self.connected = False
171 self.connected = False
172
172
173 def request(self, request, flush=True):
173 def request(self, request, flush=True):
174 if self.connected:
174 if self.connected:
175 try:
175 try:
176 self.pipei.write(request)
176 self.pipei.write(request)
177 if flush:
177 if flush:
178 self.pipei.flush()
178 self.pipei.flush()
179 except IOError:
179 except IOError:
180 self.close()
180 self.close()
181
181
182 def receiveline(self):
182 def receiveline(self):
183 if not self.connected:
183 if not self.connected:
184 return None
184 return None
185 try:
185 try:
186 result = self.pipeo.readline()[:-1]
186 result = self.pipeo.readline()[:-1]
187 if not result:
187 if not result:
188 self.close()
188 self.close()
189 except IOError:
189 except IOError:
190 self.close()
190 self.close()
191
191
192 return result
192 return result
193
193
194 def _getfilesbatch(
194 def _getfilesbatch(
195 remote, receivemissing, progresstick, missed, idmap, batchsize):
195 remote, receivemissing, progresstick, missed, idmap, batchsize):
196 # Over http(s), iterbatch is a streamy method and we can start
196 # Over http(s), iterbatch is a streamy method and we can start
197 # looking at results early. This means we send one (potentially
197 # looking at results early. This means we send one (potentially
198 # large) request, but then we show nice progress as we process
198 # large) request, but then we show nice progress as we process
199 # file results, rather than showing chunks of $batchsize in
199 # file results, rather than showing chunks of $batchsize in
200 # progress.
200 # progress.
201 #
201 #
202 # Over ssh, iterbatch isn't streamy because batch() wasn't
202 # Over ssh, iterbatch isn't streamy because batch() wasn't
203 # explicitly designed as a streaming method. In the future we
203 # explicitly designed as a streaming method. In the future we
204 # should probably introduce a streambatch() method upstream and
204 # should probably introduce a streambatch() method upstream and
205 # use that for this.
205 # use that for this.
206 with remote.commandexecutor() as e:
206 with remote.commandexecutor() as e:
207 futures = []
207 futures = []
208 for m in missed:
208 for m in missed:
209 futures.append(e.callcommand('x_rfl_getfile', {
209 futures.append(e.callcommand('x_rfl_getfile', {
210 'file': idmap[m],
210 'file': idmap[m],
211 'node': m[-40:]
211 'node': m[-40:]
212 }))
212 }))
213
213
214 for i, m in enumerate(missed):
214 for i, m in enumerate(missed):
215 r = futures[i].result()
215 r = futures[i].result()
216 futures[i] = None # release memory
216 futures[i] = None # release memory
217 file_ = idmap[m]
217 file_ = idmap[m]
218 node = m[-40:]
218 node = m[-40:]
219 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
219 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
220 progresstick()
220 progresstick()
221
221
222 def _getfiles_optimistic(
222 def _getfiles_optimistic(
223 remote, receivemissing, progresstick, missed, idmap, step):
223 remote, receivemissing, progresstick, missed, idmap, step):
224 remote._callstream("x_rfl_getfiles")
224 remote._callstream("x_rfl_getfiles")
225 i = 0
225 i = 0
226 pipeo = remote._pipeo
226 pipeo = remote._pipeo
227 pipei = remote._pipei
227 pipei = remote._pipei
228 while i < len(missed):
228 while i < len(missed):
229 # issue a batch of requests
229 # issue a batch of requests
230 start = i
230 start = i
231 end = min(len(missed), start + step)
231 end = min(len(missed), start + step)
232 i = end
232 i = end
233 for missingid in missed[start:end]:
233 for missingid in missed[start:end]:
234 # issue new request
234 # issue new request
235 versionid = missingid[-40:]
235 versionid = missingid[-40:]
236 file = idmap[missingid]
236 file = idmap[missingid]
237 sshrequest = "%s%s\n" % (versionid, file)
237 sshrequest = "%s%s\n" % (versionid, file)
238 pipeo.write(sshrequest)
238 pipeo.write(sshrequest)
239 pipeo.flush()
239 pipeo.flush()
240
240
241 # receive batch results
241 # receive batch results
242 for missingid in missed[start:end]:
242 for missingid in missed[start:end]:
243 versionid = missingid[-40:]
243 versionid = missingid[-40:]
244 file = idmap[missingid]
244 file = idmap[missingid]
245 receivemissing(pipei, file, versionid)
245 receivemissing(pipei, file, versionid)
246 progresstick()
246 progresstick()
247
247
248 # End the command
248 # End the command
249 pipeo.write('\n')
249 pipeo.write('\n')
250 pipeo.flush()
250 pipeo.flush()
251
251
252 def _getfiles_threaded(
252 def _getfiles_threaded(
253 remote, receivemissing, progresstick, missed, idmap, step):
253 remote, receivemissing, progresstick, missed, idmap, step):
254 remote._callstream("getfiles")
254 remote._callstream("getfiles")
255 pipeo = remote._pipeo
255 pipeo = remote._pipeo
256 pipei = remote._pipei
256 pipei = remote._pipei
257
257
258 def writer():
258 def writer():
259 for missingid in missed:
259 for missingid in missed:
260 versionid = missingid[-40:]
260 versionid = missingid[-40:]
261 file = idmap[missingid]
261 file = idmap[missingid]
262 sshrequest = "%s%s\n" % (versionid, file)
262 sshrequest = "%s%s\n" % (versionid, file)
263 pipeo.write(sshrequest)
263 pipeo.write(sshrequest)
264 pipeo.flush()
264 pipeo.flush()
265 writerthread = threading.Thread(target=writer)
265 writerthread = threading.Thread(target=writer)
266 writerthread.daemon = True
266 writerthread.daemon = True
267 writerthread.start()
267 writerthread.start()
268
268
269 for missingid in missed:
269 for missingid in missed:
270 versionid = missingid[-40:]
270 versionid = missingid[-40:]
271 file = idmap[missingid]
271 file = idmap[missingid]
272 receivemissing(pipei, file, versionid)
272 receivemissing(pipei, file, versionid)
273 progresstick()
273 progresstick()
274
274
275 writerthread.join()
275 writerthread.join()
276 # End the command
276 # End the command
277 pipeo.write('\n')
277 pipeo.write('\n')
278 pipeo.flush()
278 pipeo.flush()
279
279
280 class fileserverclient(object):
280 class fileserverclient(object):
281 """A client for requesting files from the remote file server.
281 """A client for requesting files from the remote file server.
282 """
282 """
283 def __init__(self, repo):
283 def __init__(self, repo):
284 ui = repo.ui
284 ui = repo.ui
285 self.repo = repo
285 self.repo = repo
286 self.ui = ui
286 self.ui = ui
287 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
287 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
288 if self.cacheprocess:
288 if self.cacheprocess:
289 self.cacheprocess = util.expandpath(self.cacheprocess)
289 self.cacheprocess = util.expandpath(self.cacheprocess)
290
290
291 # This option causes remotefilelog to pass the full file path to the
291 # This option causes remotefilelog to pass the full file path to the
292 # cacheprocess instead of a hashed key.
292 # cacheprocess instead of a hashed key.
293 self.cacheprocesspasspath = ui.configbool(
293 self.cacheprocesspasspath = ui.configbool(
294 "remotefilelog", "cacheprocess.includepath")
294 "remotefilelog", "cacheprocess.includepath")
295
295
296 self.debugoutput = ui.configbool("remotefilelog", "debug")
296 self.debugoutput = ui.configbool("remotefilelog", "debug")
297
297
298 self.remotecache = cacheconnection()
298 self.remotecache = cacheconnection()
299
299
300 def setstore(self, datastore, historystore, writedata, writehistory):
300 def setstore(self, datastore, historystore, writedata, writehistory):
301 self.datastore = datastore
301 self.datastore = datastore
302 self.historystore = historystore
302 self.historystore = historystore
303 self.writedata = writedata
303 self.writedata = writedata
304 self.writehistory = writehistory
304 self.writehistory = writehistory
305
305
306 def _connect(self):
306 def _connect(self):
307 return self.repo.connectionpool.get(self.repo.fallbackpath)
307 return self.repo.connectionpool.get(self.repo.fallbackpath)
308
308
309 def request(self, fileids):
309 def request(self, fileids):
310 """Takes a list of filename/node pairs and fetches them from the
310 """Takes a list of filename/node pairs and fetches them from the
311 server. Files are stored in the local cache.
311 server. Files are stored in the local cache.
312 A list of nodes that the server couldn't find is returned.
312 A list of nodes that the server couldn't find is returned.
313 If the connection fails, an exception is raised.
313 If the connection fails, an exception is raised.
314 """
314 """
315 if not self.remotecache.connected:
315 if not self.remotecache.connected:
316 self.connect()
316 self.connect()
317 cache = self.remotecache
317 cache = self.remotecache
318 writedata = self.writedata
318 writedata = self.writedata
319
319
320 repo = self.repo
320 repo = self.repo
321 count = len(fileids)
321 count = len(fileids)
322 request = "get\n%d\n" % count
322 request = "get\n%d\n" % count
323 idmap = {}
323 idmap = {}
324 reponame = repo.name
324 reponame = repo.name
325 for file, id in fileids:
325 for file, id in fileids:
326 fullid = getcachekey(reponame, file, id)
326 fullid = getcachekey(reponame, file, id)
327 if self.cacheprocesspasspath:
327 if self.cacheprocesspasspath:
328 request += file + '\0'
328 request += file + '\0'
329 request += fullid + "\n"
329 request += fullid + "\n"
330 idmap[fullid] = file
330 idmap[fullid] = file
331
331
332 cache.request(request)
332 cache.request(request)
333
333
334 total = count
334 total = count
335 self.ui.progress(_downloading, 0, total=count)
335 self.ui.progress(_downloading, 0, total=count)
336
336
337 missed = []
337 missed = []
338 count = 0
338 count = 0
339 while True:
339 while True:
340 missingid = cache.receiveline()
340 missingid = cache.receiveline()
341 if not missingid:
341 if not missingid:
342 missedset = set(missed)
342 missedset = set(missed)
343 for missingid in idmap:
343 for missingid in idmap:
344 if not missingid in missedset:
344 if not missingid in missedset:
345 missed.append(missingid)
345 missed.append(missingid)
346 self.ui.warn(_("warning: cache connection closed early - " +
346 self.ui.warn(_("warning: cache connection closed early - " +
347 "falling back to server\n"))
347 "falling back to server\n"))
348 break
348 break
349 if missingid == "0":
349 if missingid == "0":
350 break
350 break
351 if missingid.startswith("_hits_"):
351 if missingid.startswith("_hits_"):
352 # receive progress reports
352 # receive progress reports
353 parts = missingid.split("_")
353 parts = missingid.split("_")
354 count += int(parts[2])
354 count += int(parts[2])
355 self.ui.progress(_downloading, count, total=total)
355 self.ui.progress(_downloading, count, total=total)
356 continue
356 continue
357
357
358 missed.append(missingid)
358 missed.append(missingid)
359
359
360 global fetchmisses
360 global fetchmisses
361 fetchmisses += len(missed)
361 fetchmisses += len(missed)
362
362
363 count = [total - len(missed)]
363 count = [total - len(missed)]
364 fromcache = count[0]
364 fromcache = count[0]
365 self.ui.progress(_downloading, count[0], total=total)
365 self.ui.progress(_downloading, count[0], total=total)
366 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
366 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
367 count[0], total, hit=count[0], total=total)
367 count[0], total, hit=count[0], total=total)
368
368
369 oldumask = os.umask(0o002)
369 oldumask = os.umask(0o002)
370 try:
370 try:
371 # receive cache misses from master
371 # receive cache misses from master
372 if missed:
372 if missed:
373 def progresstick():
373 def progresstick():
374 count[0] += 1
374 count[0] += 1
375 self.ui.progress(_downloading, count[0], total=total)
375 self.ui.progress(_downloading, count[0], total=total)
376 # When verbose is true, sshpeer prints 'running ssh...'
376 # When verbose is true, sshpeer prints 'running ssh...'
377 # to stdout, which can interfere with some command
377 # to stdout, which can interfere with some command
378 # outputs
378 # outputs
379 verbose = self.ui.verbose
379 verbose = self.ui.verbose
380 self.ui.verbose = False
380 self.ui.verbose = False
381 try:
381 try:
382 with self._connect() as conn:
382 with self._connect() as conn:
383 remote = conn.peer
383 remote = conn.peer
384 if remote.capable(
384 if remote.capable(
385 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
385 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
386 if not isinstance(remote, _sshv1peer):
386 if not isinstance(remote, _sshv1peer):
387 raise error.Abort('remotefilelog requires ssh '
387 raise error.Abort('remotefilelog requires ssh '
388 'servers')
388 'servers')
389 step = self.ui.configint('remotefilelog',
389 step = self.ui.configint('remotefilelog',
390 'getfilesstep')
390 'getfilesstep')
391 getfilestype = self.ui.config('remotefilelog',
391 getfilestype = self.ui.config('remotefilelog',
392 'getfilestype')
392 'getfilestype')
393 if getfilestype == 'threaded':
393 if getfilestype == 'threaded':
394 _getfiles = _getfiles_threaded
394 _getfiles = _getfiles_threaded
395 else:
395 else:
396 _getfiles = _getfiles_optimistic
396 _getfiles = _getfiles_optimistic
397 _getfiles(remote, self.receivemissing, progresstick,
397 _getfiles(remote, self.receivemissing, progresstick,
398 missed, idmap, step)
398 missed, idmap, step)
399 elif remote.capable("x_rfl_getfile"):
399 elif remote.capable("x_rfl_getfile"):
400 if remote.capable('batch'):
400 if remote.capable('batch'):
401 batchdefault = 100
401 batchdefault = 100
402 else:
402 else:
403 batchdefault = 10
403 batchdefault = 10
404 batchsize = self.ui.configint(
404 batchsize = self.ui.configint(
405 'remotefilelog', 'batchsize', batchdefault)
405 'remotefilelog', 'batchsize', batchdefault)
406 _getfilesbatch(
406 _getfilesbatch(
407 remote, self.receivemissing, progresstick,
407 remote, self.receivemissing, progresstick,
408 missed, idmap, batchsize)
408 missed, idmap, batchsize)
409 else:
409 else:
410 raise error.Abort("configured remotefilelog server"
410 raise error.Abort("configured remotefilelog server"
411 " does not support remotefilelog")
411 " does not support remotefilelog")
412
412
413 self.ui.log("remotefilefetchlog",
413 self.ui.log("remotefilefetchlog",
414 "Success\n",
414 "Success\n",
415 fetched_files = count[0] - fromcache,
415 fetched_files = count[0] - fromcache,
416 total_to_fetch = total - fromcache)
416 total_to_fetch = total - fromcache)
417 except Exception:
417 except Exception:
418 self.ui.log("remotefilefetchlog",
418 self.ui.log("remotefilefetchlog",
419 "Fail\n",
419 "Fail\n",
420 fetched_files = count[0] - fromcache,
420 fetched_files = count[0] - fromcache,
421 total_to_fetch = total - fromcache)
421 total_to_fetch = total - fromcache)
422 raise
422 raise
423 finally:
423 finally:
424 self.ui.verbose = verbose
424 self.ui.verbose = verbose
425 # send to memcache
425 # send to memcache
426 count[0] = len(missed)
426 count[0] = len(missed)
427 request = "set\n%d\n%s\n" % (count[0], "\n".join(missed))
427 request = "set\n%d\n%s\n" % (count[0], "\n".join(missed))
428 cache.request(request)
428 cache.request(request)
429
429
430 self.ui.progress(_downloading, None)
430 self.ui.progress(_downloading, None)
431
431
432 # mark ourselves as a user of this cache
432 # mark ourselves as a user of this cache
433 writedata.markrepo(self.repo.path)
433 writedata.markrepo(self.repo.path)
434 finally:
434 finally:
435 os.umask(oldumask)
435 os.umask(oldumask)
436
436
437 def receivemissing(self, pipe, filename, node):
437 def receivemissing(self, pipe, filename, node):
438 line = pipe.readline()[:-1]
438 line = pipe.readline()[:-1]
439 if not line:
439 if not line:
440 raise error.ResponseError(_("error downloading file contents:"),
440 raise error.ResponseError(_("error downloading file contents:"),
441 _("connection closed early"))
441 _("connection closed early"))
442 size = int(line)
442 size = int(line)
443 data = pipe.read(size)
443 data = pipe.read(size)
444 if len(data) != size:
444 if len(data) != size:
445 raise error.ResponseError(_("error downloading file contents:"),
445 raise error.ResponseError(_("error downloading file contents:"),
446 _("only received %s of %s bytes")
446 _("only received %s of %s bytes")
447 % (len(data), size))
447 % (len(data), size))
448
448
449 self.writedata.addremotefilelognode(filename, bin(node),
449 self.writedata.addremotefilelognode(filename, bin(node),
450 zlib.decompress(data))
450 zlib.decompress(data))
451
451
452 def connect(self):
452 def connect(self):
453 if self.cacheprocess:
453 if self.cacheprocess:
454 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
454 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
455 self.remotecache.connect(cmd)
455 self.remotecache.connect(cmd)
456 else:
456 else:
457 # If no cache process is specified, we fake one that always
457 # If no cache process is specified, we fake one that always
458 # returns cache misses. This enables tests to run easily
458 # returns cache misses. This enables tests to run easily
459 # and may eventually allow us to be a drop in replacement
459 # and may eventually allow us to be a drop in replacement
460 # for the largefiles extension.
460 # for the largefiles extension.
461 class simplecache(object):
461 class simplecache(object):
462 def __init__(self):
462 def __init__(self):
463 self.missingids = []
463 self.missingids = []
464 self.connected = True
464 self.connected = True
465
465
466 def close(self):
466 def close(self):
467 pass
467 pass
468
468
469 def request(self, value, flush=True):
469 def request(self, value, flush=True):
470 lines = value.split("\n")
470 lines = value.split("\n")
471 if lines[0] != "get":
471 if lines[0] != "get":
472 return
472 return
473 self.missingids = lines[2:-1]
473 self.missingids = lines[2:-1]
474 self.missingids.append('0')
474 self.missingids.append('0')
475
475
476 def receiveline(self):
476 def receiveline(self):
477 if len(self.missingids) > 0:
477 if len(self.missingids) > 0:
478 return self.missingids.pop(0)
478 return self.missingids.pop(0)
479 return None
479 return None
480
480
481 self.remotecache = simplecache()
481 self.remotecache = simplecache()
482
482
483 def close(self):
483 def close(self):
484 if fetches:
484 if fetches:
485 msg = ("%s files fetched over %d fetches - " +
485 msg = ("%d files fetched over %d fetches - " +
486 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
486 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
487 fetched,
487 fetched,
488 fetches,
488 fetches,
489 fetchmisses,
489 fetchmisses,
490 float(fetched - fetchmisses) / float(fetched) * 100.0,
490 float(fetched - fetchmisses) / float(fetched) * 100.0,
491 fetchcost)
491 fetchcost)
492 if self.debugoutput:
492 if self.debugoutput:
493 self.ui.warn(msg)
493 self.ui.warn(msg)
494 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
494 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
495 remotefilelogfetched=fetched,
495 remotefilelogfetched=fetched,
496 remotefilelogfetches=fetches,
496 remotefilelogfetches=fetches,
497 remotefilelogfetchmisses=fetchmisses,
497 remotefilelogfetchmisses=fetchmisses,
498 remotefilelogfetchtime=fetchcost * 1000)
498 remotefilelogfetchtime=fetchcost * 1000)
499
499
500 if self.remotecache.connected:
500 if self.remotecache.connected:
501 self.remotecache.close()
501 self.remotecache.close()
502
502
503 def prefetch(self, fileids, force=False, fetchdata=True,
503 def prefetch(self, fileids, force=False, fetchdata=True,
504 fetchhistory=False):
504 fetchhistory=False):
505 """downloads the given file versions to the cache
505 """downloads the given file versions to the cache
506 """
506 """
507 repo = self.repo
507 repo = self.repo
508 idstocheck = []
508 idstocheck = []
509 for file, id in fileids:
509 for file, id in fileids:
510 # hack
510 # hack
511 # - we don't use .hgtags
511 # - we don't use .hgtags
512 # - workingctx produces ids with length 42,
512 # - workingctx produces ids with length 42,
513 # which we skip since they aren't in any cache
513 # which we skip since they aren't in any cache
514 if (file == '.hgtags' or len(id) == 42
514 if (file == '.hgtags' or len(id) == 42
515 or not repo.shallowmatch(file)):
515 or not repo.shallowmatch(file)):
516 continue
516 continue
517
517
518 idstocheck.append((file, bin(id)))
518 idstocheck.append((file, bin(id)))
519
519
520 datastore = self.datastore
520 datastore = self.datastore
521 historystore = self.historystore
521 historystore = self.historystore
522 if force:
522 if force:
523 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
523 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
524 historystore = metadatastore.unionmetadatastore(
524 historystore = metadatastore.unionmetadatastore(
525 *repo.sharedhistorystores)
525 *repo.sharedhistorystores)
526
526
527 missingids = set()
527 missingids = set()
528 if fetchdata:
528 if fetchdata:
529 missingids.update(datastore.getmissing(idstocheck))
529 missingids.update(datastore.getmissing(idstocheck))
530 if fetchhistory:
530 if fetchhistory:
531 missingids.update(historystore.getmissing(idstocheck))
531 missingids.update(historystore.getmissing(idstocheck))
532
532
533 # partition missing nodes into nullid and not-nullid so we can
533 # partition missing nodes into nullid and not-nullid so we can
534 # warn about this filtering potentially shadowing bugs.
534 # warn about this filtering potentially shadowing bugs.
535 nullids = len([None for unused, id in missingids if id == nullid])
535 nullids = len([None for unused, id in missingids if id == nullid])
536 if nullids:
536 if nullids:
537 missingids = [(f, id) for f, id in missingids if id != nullid]
537 missingids = [(f, id) for f, id in missingids if id != nullid]
538 repo.ui.develwarn(
538 repo.ui.develwarn(
539 ('remotefilelog not fetching %d null revs'
539 ('remotefilelog not fetching %d null revs'
540 ' - this is likely hiding bugs' % nullids),
540 ' - this is likely hiding bugs' % nullids),
541 config='remotefilelog-ext')
541 config='remotefilelog-ext')
542 if missingids:
542 if missingids:
543 global fetches, fetched, fetchcost
543 global fetches, fetched, fetchcost
544 fetches += 1
544 fetches += 1
545
545
546 # We want to be able to detect excess individual file downloads, so
546 # We want to be able to detect excess individual file downloads, so
547 # let's log that information for debugging.
547 # let's log that information for debugging.
548 if fetches >= 15 and fetches < 18:
548 if fetches >= 15 and fetches < 18:
549 if fetches == 15:
549 if fetches == 15:
550 fetchwarning = self.ui.config('remotefilelog',
550 fetchwarning = self.ui.config('remotefilelog',
551 'fetchwarning')
551 'fetchwarning')
552 if fetchwarning:
552 if fetchwarning:
553 self.ui.warn(fetchwarning + '\n')
553 self.ui.warn(fetchwarning + '\n')
554 self.logstacktrace()
554 self.logstacktrace()
555 missingids = [(file, hex(id)) for file, id in missingids]
555 missingids = [(file, hex(id)) for file, id in missingids]
556 fetched += len(missingids)
556 fetched += len(missingids)
557 start = time.time()
557 start = time.time()
558 missingids = self.request(missingids)
558 missingids = self.request(missingids)
559 if missingids:
559 if missingids:
560 raise error.Abort(_("unable to download %d files") %
560 raise error.Abort(_("unable to download %d files") %
561 len(missingids))
561 len(missingids))
562 fetchcost += time.time() - start
562 fetchcost += time.time() - start
563 self._lfsprefetch(fileids)
563 self._lfsprefetch(fileids)
564
564
565 def _lfsprefetch(self, fileids):
565 def _lfsprefetch(self, fileids):
566 if not _lfsmod or not util.safehasattr(
566 if not _lfsmod or not util.safehasattr(
567 self.repo.svfs, 'lfslocalblobstore'):
567 self.repo.svfs, 'lfslocalblobstore'):
568 return
568 return
569 if not _lfsmod.wrapper.candownload(self.repo):
569 if not _lfsmod.wrapper.candownload(self.repo):
570 return
570 return
571 pointers = []
571 pointers = []
572 store = self.repo.svfs.lfslocalblobstore
572 store = self.repo.svfs.lfslocalblobstore
573 for file, id in fileids:
573 for file, id in fileids:
574 node = bin(id)
574 node = bin(id)
575 rlog = self.repo.file(file)
575 rlog = self.repo.file(file)
576 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
576 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
577 text = rlog.revision(node, raw=True)
577 text = rlog.revision(node, raw=True)
578 p = _lfsmod.pointer.deserialize(text)
578 p = _lfsmod.pointer.deserialize(text)
579 oid = p.oid()
579 oid = p.oid()
580 if not store.has(oid):
580 if not store.has(oid):
581 pointers.append(p)
581 pointers.append(p)
582 if len(pointers) > 0:
582 if len(pointers) > 0:
583 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
583 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
584 assert all(store.has(p.oid()) for p in pointers)
584 assert all(store.has(p.oid()) for p in pointers)
585
585
586 def logstacktrace(self):
586 def logstacktrace(self):
587 import traceback
587 import traceback
588 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
588 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
589 ''.join(traceback.format_stack()))
589 ''.join(traceback.format_stack()))
@@ -1,406 +1,406
1 # remotefilelogserver.py - server logic for a remotefilelog server
1 # remotefilelogserver.py - server logic for a remotefilelog server
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import errno
9 import errno
10 import os
10 import os
11 import stat
11 import stat
12 import time
12 import time
13 import zlib
13 import zlib
14
14
15 from mercurial.i18n import _
15 from mercurial.i18n import _
16 from mercurial.node import bin, hex, nullid
16 from mercurial.node import bin, hex, nullid
17 from mercurial import (
17 from mercurial import (
18 changegroup,
18 changegroup,
19 changelog,
19 changelog,
20 context,
20 context,
21 error,
21 error,
22 extensions,
22 extensions,
23 match,
23 match,
24 store,
24 store,
25 streamclone,
25 streamclone,
26 util,
26 util,
27 wireprotoserver,
27 wireprotoserver,
28 wireprototypes,
28 wireprototypes,
29 wireprotov1server,
29 wireprotov1server,
30 )
30 )
31 from . import (
31 from . import (
32 constants,
32 constants,
33 shallowutil,
33 shallowutil,
34 )
34 )
35
35
36 _sshv1server = wireprotoserver.sshv1protocolhandler
36 _sshv1server = wireprotoserver.sshv1protocolhandler
37
37
38 def setupserver(ui, repo):
38 def setupserver(ui, repo):
39 """Sets up a normal Mercurial repo so it can serve files to shallow repos.
39 """Sets up a normal Mercurial repo so it can serve files to shallow repos.
40 """
40 """
41 onetimesetup(ui)
41 onetimesetup(ui)
42
42
43 # don't send files to shallow clients during pulls
43 # don't send files to shallow clients during pulls
44 def generatefiles(orig, self, changedfiles, linknodes, commonrevs, source,
44 def generatefiles(orig, self, changedfiles, linknodes, commonrevs, source,
45 *args, **kwargs):
45 *args, **kwargs):
46 caps = self._bundlecaps or []
46 caps = self._bundlecaps or []
47 if constants.BUNDLE2_CAPABLITY in caps:
47 if constants.BUNDLE2_CAPABLITY in caps:
48 # only send files that don't match the specified patterns
48 # only send files that don't match the specified patterns
49 includepattern = None
49 includepattern = None
50 excludepattern = None
50 excludepattern = None
51 for cap in (self._bundlecaps or []):
51 for cap in (self._bundlecaps or []):
52 if cap.startswith("includepattern="):
52 if cap.startswith("includepattern="):
53 includepattern = cap[len("includepattern="):].split('\0')
53 includepattern = cap[len("includepattern="):].split('\0')
54 elif cap.startswith("excludepattern="):
54 elif cap.startswith("excludepattern="):
55 excludepattern = cap[len("excludepattern="):].split('\0')
55 excludepattern = cap[len("excludepattern="):].split('\0')
56
56
57 m = match.always(repo.root, '')
57 m = match.always(repo.root, '')
58 if includepattern or excludepattern:
58 if includepattern or excludepattern:
59 m = match.match(repo.root, '', None,
59 m = match.match(repo.root, '', None,
60 includepattern, excludepattern)
60 includepattern, excludepattern)
61
61
62 changedfiles = list([f for f in changedfiles if not m(f)])
62 changedfiles = list([f for f in changedfiles if not m(f)])
63 return orig(self, changedfiles, linknodes, commonrevs, source,
63 return orig(self, changedfiles, linknodes, commonrevs, source,
64 *args, **kwargs)
64 *args, **kwargs)
65
65
66 extensions.wrapfunction(
66 extensions.wrapfunction(
67 changegroup.cgpacker, 'generatefiles', generatefiles)
67 changegroup.cgpacker, 'generatefiles', generatefiles)
68
68
69 onetime = False
69 onetime = False
70 def onetimesetup(ui):
70 def onetimesetup(ui):
71 """Configures the wireprotocol for both clients and servers.
71 """Configures the wireprotocol for both clients and servers.
72 """
72 """
73 global onetime
73 global onetime
74 if onetime:
74 if onetime:
75 return
75 return
76 onetime = True
76 onetime = True
77
77
78 # support file content requests
78 # support file content requests
79 wireprotov1server.wireprotocommand(
79 wireprotov1server.wireprotocommand(
80 'x_rfl_getflogheads', 'path', permission='pull')(getflogheads)
80 'x_rfl_getflogheads', 'path', permission='pull')(getflogheads)
81 wireprotov1server.wireprotocommand(
81 wireprotov1server.wireprotocommand(
82 'x_rfl_getfiles', '', permission='pull')(getfiles)
82 'x_rfl_getfiles', '', permission='pull')(getfiles)
83 wireprotov1server.wireprotocommand(
83 wireprotov1server.wireprotocommand(
84 'x_rfl_getfile', 'file node', permission='pull')(getfile)
84 'x_rfl_getfile', 'file node', permission='pull')(getfile)
85
85
86 class streamstate(object):
86 class streamstate(object):
87 match = None
87 match = None
88 shallowremote = False
88 shallowremote = False
89 noflatmf = False
89 noflatmf = False
90 state = streamstate()
90 state = streamstate()
91
91
92 def stream_out_shallow(repo, proto, other):
92 def stream_out_shallow(repo, proto, other):
93 includepattern = None
93 includepattern = None
94 excludepattern = None
94 excludepattern = None
95 raw = other.get('includepattern')
95 raw = other.get('includepattern')
96 if raw:
96 if raw:
97 includepattern = raw.split('\0')
97 includepattern = raw.split('\0')
98 raw = other.get('excludepattern')
98 raw = other.get('excludepattern')
99 if raw:
99 if raw:
100 excludepattern = raw.split('\0')
100 excludepattern = raw.split('\0')
101
101
102 oldshallow = state.shallowremote
102 oldshallow = state.shallowremote
103 oldmatch = state.match
103 oldmatch = state.match
104 oldnoflatmf = state.noflatmf
104 oldnoflatmf = state.noflatmf
105 try:
105 try:
106 state.shallowremote = True
106 state.shallowremote = True
107 state.match = match.always(repo.root, '')
107 state.match = match.always(repo.root, '')
108 state.noflatmf = other.get('noflatmanifest') == 'True'
108 state.noflatmf = other.get('noflatmanifest') == 'True'
109 if includepattern or excludepattern:
109 if includepattern or excludepattern:
110 state.match = match.match(repo.root, '', None,
110 state.match = match.match(repo.root, '', None,
111 includepattern, excludepattern)
111 includepattern, excludepattern)
112 streamres = wireprotov1server.stream(repo, proto)
112 streamres = wireprotov1server.stream(repo, proto)
113
113
114 # Force the first value to execute, so the file list is computed
114 # Force the first value to execute, so the file list is computed
115 # within the try/finally scope
115 # within the try/finally scope
116 first = next(streamres.gen)
116 first = next(streamres.gen)
117 second = next(streamres.gen)
117 second = next(streamres.gen)
118 def gen():
118 def gen():
119 yield first
119 yield first
120 yield second
120 yield second
121 for value in streamres.gen:
121 for value in streamres.gen:
122 yield value
122 yield value
123 return wireprototypes.streamres(gen())
123 return wireprototypes.streamres(gen())
124 finally:
124 finally:
125 state.shallowremote = oldshallow
125 state.shallowremote = oldshallow
126 state.match = oldmatch
126 state.match = oldmatch
127 state.noflatmf = oldnoflatmf
127 state.noflatmf = oldnoflatmf
128
128
129 wireprotov1server.commands['stream_out_shallow'] = (stream_out_shallow, '*')
129 wireprotov1server.commands['stream_out_shallow'] = (stream_out_shallow, '*')
130
130
131 # don't clone filelogs to shallow clients
131 # don't clone filelogs to shallow clients
132 def _walkstreamfiles(orig, repo, matcher=None):
132 def _walkstreamfiles(orig, repo, matcher=None):
133 if state.shallowremote:
133 if state.shallowremote:
134 # if we are shallow ourselves, stream our local commits
134 # if we are shallow ourselves, stream our local commits
135 if shallowutil.isenabled(repo):
135 if shallowutil.isenabled(repo):
136 striplen = len(repo.store.path) + 1
136 striplen = len(repo.store.path) + 1
137 readdir = repo.store.rawvfs.readdir
137 readdir = repo.store.rawvfs.readdir
138 visit = [os.path.join(repo.store.path, 'data')]
138 visit = [os.path.join(repo.store.path, 'data')]
139 while visit:
139 while visit:
140 p = visit.pop()
140 p = visit.pop()
141 for f, kind, st in readdir(p, stat=True):
141 for f, kind, st in readdir(p, stat=True):
142 fp = p + '/' + f
142 fp = p + '/' + f
143 if kind == stat.S_IFREG:
143 if kind == stat.S_IFREG:
144 if not fp.endswith('.i') and not fp.endswith('.d'):
144 if not fp.endswith('.i') and not fp.endswith('.d'):
145 n = util.pconvert(fp[striplen:])
145 n = util.pconvert(fp[striplen:])
146 yield (store.decodedir(n), n, st.st_size)
146 yield (store.decodedir(n), n, st.st_size)
147 if kind == stat.S_IFDIR:
147 if kind == stat.S_IFDIR:
148 visit.append(fp)
148 visit.append(fp)
149
149
150 if 'treemanifest' in repo.requirements:
150 if 'treemanifest' in repo.requirements:
151 for (u, e, s) in repo.store.datafiles():
151 for (u, e, s) in repo.store.datafiles():
152 if (u.startswith('meta/') and
152 if (u.startswith('meta/') and
153 (u.endswith('.i') or u.endswith('.d'))):
153 (u.endswith('.i') or u.endswith('.d'))):
154 yield (u, e, s)
154 yield (u, e, s)
155
155
156 # Return .d and .i files that do not match the shallow pattern
156 # Return .d and .i files that do not match the shallow pattern
157 match = state.match
157 match = state.match
158 if match and not match.always():
158 if match and not match.always():
159 for (u, e, s) in repo.store.datafiles():
159 for (u, e, s) in repo.store.datafiles():
160 f = u[5:-2] # trim data/... and .i/.d
160 f = u[5:-2] # trim data/... and .i/.d
161 if not state.match(f):
161 if not state.match(f):
162 yield (u, e, s)
162 yield (u, e, s)
163
163
164 for x in repo.store.topfiles():
164 for x in repo.store.topfiles():
165 if state.noflatmf and x[0][:11] == '00manifest.':
165 if state.noflatmf and x[0][:11] == '00manifest.':
166 continue
166 continue
167 yield x
167 yield x
168
168
169 elif shallowutil.isenabled(repo):
169 elif shallowutil.isenabled(repo):
170 # don't allow cloning from a shallow repo to a full repo
170 # don't allow cloning from a shallow repo to a full repo
171 # since it would require fetching every version of every
171 # since it would require fetching every version of every
172 # file in order to create the revlogs.
172 # file in order to create the revlogs.
173 raise error.Abort(_("Cannot clone from a shallow repo "
173 raise error.Abort(_("Cannot clone from a shallow repo "
174 "to a full repo."))
174 "to a full repo."))
175 else:
175 else:
176 for x in orig(repo, matcher):
176 for x in orig(repo, matcher):
177 yield x
177 yield x
178
178
179 extensions.wrapfunction(streamclone, '_walkstreamfiles', _walkstreamfiles)
179 extensions.wrapfunction(streamclone, '_walkstreamfiles', _walkstreamfiles)
180
180
181 # expose remotefilelog capabilities
181 # expose remotefilelog capabilities
182 def _capabilities(orig, repo, proto):
182 def _capabilities(orig, repo, proto):
183 caps = orig(repo, proto)
183 caps = orig(repo, proto)
184 if (shallowutil.isenabled(repo) or ui.configbool('remotefilelog',
184 if (shallowutil.isenabled(repo) or ui.configbool('remotefilelog',
185 'server')):
185 'server')):
186 if isinstance(proto, _sshv1server):
186 if isinstance(proto, _sshv1server):
187 # legacy getfiles method which only works over ssh
187 # legacy getfiles method which only works over ssh
188 caps.append(constants.NETWORK_CAP_LEGACY_SSH_GETFILES)
188 caps.append(constants.NETWORK_CAP_LEGACY_SSH_GETFILES)
189 caps.append('x_rfl_getflogheads')
189 caps.append('x_rfl_getflogheads')
190 caps.append('x_rfl_getfile')
190 caps.append('x_rfl_getfile')
191 return caps
191 return caps
192 extensions.wrapfunction(wireprotov1server, '_capabilities', _capabilities)
192 extensions.wrapfunction(wireprotov1server, '_capabilities', _capabilities)
193
193
194 def _adjustlinkrev(orig, self, *args, **kwargs):
194 def _adjustlinkrev(orig, self, *args, **kwargs):
195 # When generating file blobs, taking the real path is too slow on large
195 # When generating file blobs, taking the real path is too slow on large
196 # repos, so force it to just return the linkrev directly.
196 # repos, so force it to just return the linkrev directly.
197 repo = self._repo
197 repo = self._repo
198 if util.safehasattr(repo, 'forcelinkrev') and repo.forcelinkrev:
198 if util.safehasattr(repo, 'forcelinkrev') and repo.forcelinkrev:
199 return self._filelog.linkrev(self._filelog.rev(self._filenode))
199 return self._filelog.linkrev(self._filelog.rev(self._filenode))
200 return orig(self, *args, **kwargs)
200 return orig(self, *args, **kwargs)
201
201
202 extensions.wrapfunction(
202 extensions.wrapfunction(
203 context.basefilectx, '_adjustlinkrev', _adjustlinkrev)
203 context.basefilectx, '_adjustlinkrev', _adjustlinkrev)
204
204
205 def _iscmd(orig, cmd):
205 def _iscmd(orig, cmd):
206 if cmd == 'x_rfl_getfiles':
206 if cmd == 'x_rfl_getfiles':
207 return False
207 return False
208 return orig(cmd)
208 return orig(cmd)
209
209
210 extensions.wrapfunction(wireprotoserver, 'iscmd', _iscmd)
210 extensions.wrapfunction(wireprotoserver, 'iscmd', _iscmd)
211
211
212 def _loadfileblob(repo, cachepath, path, node):
212 def _loadfileblob(repo, cachepath, path, node):
213 filecachepath = os.path.join(cachepath, path, hex(node))
213 filecachepath = os.path.join(cachepath, path, hex(node))
214 if not os.path.exists(filecachepath) or os.path.getsize(filecachepath) == 0:
214 if not os.path.exists(filecachepath) or os.path.getsize(filecachepath) == 0:
215 filectx = repo.filectx(path, fileid=node)
215 filectx = repo.filectx(path, fileid=node)
216 if filectx.node() == nullid:
216 if filectx.node() == nullid:
217 repo.changelog = changelog.changelog(repo.svfs)
217 repo.changelog = changelog.changelog(repo.svfs)
218 filectx = repo.filectx(path, fileid=node)
218 filectx = repo.filectx(path, fileid=node)
219
219
220 text = createfileblob(filectx)
220 text = createfileblob(filectx)
221 # TODO configurable compression engines
221 # TODO configurable compression engines
222 text = zlib.compress(text)
222 text = zlib.compress(text)
223
223
224 # everything should be user & group read/writable
224 # everything should be user & group read/writable
225 oldumask = os.umask(0o002)
225 oldumask = os.umask(0o002)
226 try:
226 try:
227 dirname = os.path.dirname(filecachepath)
227 dirname = os.path.dirname(filecachepath)
228 if not os.path.exists(dirname):
228 if not os.path.exists(dirname):
229 try:
229 try:
230 os.makedirs(dirname)
230 os.makedirs(dirname)
231 except OSError as ex:
231 except OSError as ex:
232 if ex.errno != errno.EEXIST:
232 if ex.errno != errno.EEXIST:
233 raise
233 raise
234
234
235 f = None
235 f = None
236 try:
236 try:
237 f = util.atomictempfile(filecachepath, "w")
237 f = util.atomictempfile(filecachepath, "wb")
238 f.write(text)
238 f.write(text)
239 except (IOError, OSError):
239 except (IOError, OSError):
240 # Don't abort if the user only has permission to read,
240 # Don't abort if the user only has permission to read,
241 # and not write.
241 # and not write.
242 pass
242 pass
243 finally:
243 finally:
244 if f:
244 if f:
245 f.close()
245 f.close()
246 finally:
246 finally:
247 os.umask(oldumask)
247 os.umask(oldumask)
248 else:
248 else:
249 with open(filecachepath, "r") as f:
249 with open(filecachepath, "rb") as f:
250 text = f.read()
250 text = f.read()
251 return text
251 return text
252
252
253 def getflogheads(repo, proto, path):
253 def getflogheads(repo, proto, path):
254 """A server api for requesting a filelog's heads
254 """A server api for requesting a filelog's heads
255 """
255 """
256 flog = repo.file(path)
256 flog = repo.file(path)
257 heads = flog.heads()
257 heads = flog.heads()
258 return '\n'.join((hex(head) for head in heads if head != nullid))
258 return '\n'.join((hex(head) for head in heads if head != nullid))
259
259
260 def getfile(repo, proto, file, node):
260 def getfile(repo, proto, file, node):
261 """A server api for requesting a particular version of a file. Can be used
261 """A server api for requesting a particular version of a file. Can be used
262 in batches to request many files at once. The return protocol is:
262 in batches to request many files at once. The return protocol is:
263 <errorcode>\0<data/errormsg> where <errorcode> is 0 for success or
263 <errorcode>\0<data/errormsg> where <errorcode> is 0 for success or
264 non-zero for an error.
264 non-zero for an error.
265
265
266 data is a compressed blob with revlog flag and ancestors information. See
266 data is a compressed blob with revlog flag and ancestors information. See
267 createfileblob for its content.
267 createfileblob for its content.
268 """
268 """
269 if shallowutil.isenabled(repo):
269 if shallowutil.isenabled(repo):
270 return '1\0' + _('cannot fetch remote files from shallow repo')
270 return '1\0' + _('cannot fetch remote files from shallow repo')
271 cachepath = repo.ui.config("remotefilelog", "servercachepath")
271 cachepath = repo.ui.config("remotefilelog", "servercachepath")
272 if not cachepath:
272 if not cachepath:
273 cachepath = os.path.join(repo.path, "remotefilelogcache")
273 cachepath = os.path.join(repo.path, "remotefilelogcache")
274 node = bin(node.strip())
274 node = bin(node.strip())
275 if node == nullid:
275 if node == nullid:
276 return '0\0'
276 return '0\0'
277 return '0\0' + _loadfileblob(repo, cachepath, file, node)
277 return '0\0' + _loadfileblob(repo, cachepath, file, node)
278
278
279 def getfiles(repo, proto):
279 def getfiles(repo, proto):
280 """A server api for requesting particular versions of particular files.
280 """A server api for requesting particular versions of particular files.
281 """
281 """
282 if shallowutil.isenabled(repo):
282 if shallowutil.isenabled(repo):
283 raise error.Abort(_('cannot fetch remote files from shallow repo'))
283 raise error.Abort(_('cannot fetch remote files from shallow repo'))
284 if not isinstance(proto, _sshv1server):
284 if not isinstance(proto, _sshv1server):
285 raise error.Abort(_('cannot fetch remote files over non-ssh protocol'))
285 raise error.Abort(_('cannot fetch remote files over non-ssh protocol'))
286
286
287 def streamer():
287 def streamer():
288 fin = proto._fin
288 fin = proto._fin
289
289
290 cachepath = repo.ui.config("remotefilelog", "servercachepath")
290 cachepath = repo.ui.config("remotefilelog", "servercachepath")
291 if not cachepath:
291 if not cachepath:
292 cachepath = os.path.join(repo.path, "remotefilelogcache")
292 cachepath = os.path.join(repo.path, "remotefilelogcache")
293
293
294 while True:
294 while True:
295 request = fin.readline()[:-1]
295 request = fin.readline()[:-1]
296 if not request:
296 if not request:
297 break
297 break
298
298
299 node = bin(request[:40])
299 node = bin(request[:40])
300 if node == nullid:
300 if node == nullid:
301 yield '0\n'
301 yield '0\n'
302 continue
302 continue
303
303
304 path = request[40:]
304 path = request[40:]
305
305
306 text = _loadfileblob(repo, cachepath, path, node)
306 text = _loadfileblob(repo, cachepath, path, node)
307
307
308 yield '%d\n%s' % (len(text), text)
308 yield '%d\n%s' % (len(text), text)
309
309
310 # it would be better to only flush after processing a whole batch
310 # it would be better to only flush after processing a whole batch
311 # but currently we don't know if there are more requests coming
311 # but currently we don't know if there are more requests coming
312 proto._fout.flush()
312 proto._fout.flush()
313 return wireprototypes.streamres(streamer())
313 return wireprototypes.streamres(streamer())
314
314
315 def createfileblob(filectx):
315 def createfileblob(filectx):
316 """
316 """
317 format:
317 format:
318 v0:
318 v0:
319 str(len(rawtext)) + '\0' + rawtext + ancestortext
319 str(len(rawtext)) + '\0' + rawtext + ancestortext
320 v1:
320 v1:
321 'v1' + '\n' + metalist + '\0' + rawtext + ancestortext
321 'v1' + '\n' + metalist + '\0' + rawtext + ancestortext
322 metalist := metalist + '\n' + meta | meta
322 metalist := metalist + '\n' + meta | meta
323 meta := sizemeta | flagmeta
323 meta := sizemeta | flagmeta
324 sizemeta := METAKEYSIZE + str(len(rawtext))
324 sizemeta := METAKEYSIZE + str(len(rawtext))
325 flagmeta := METAKEYFLAG + str(flag)
325 flagmeta := METAKEYFLAG + str(flag)
326
326
327 note: sizemeta must exist. METAKEYFLAG and METAKEYSIZE must have a
327 note: sizemeta must exist. METAKEYFLAG and METAKEYSIZE must have a
328 length of 1.
328 length of 1.
329 """
329 """
330 flog = filectx.filelog()
330 flog = filectx.filelog()
331 frev = filectx.filerev()
331 frev = filectx.filerev()
332 revlogflags = flog._revlog.flags(frev)
332 revlogflags = flog._revlog.flags(frev)
333 if revlogflags == 0:
333 if revlogflags == 0:
334 # normal files
334 # normal files
335 text = filectx.data()
335 text = filectx.data()
336 else:
336 else:
337 # lfs, read raw revision data
337 # lfs, read raw revision data
338 text = flog.revision(frev, raw=True)
338 text = flog.revision(frev, raw=True)
339
339
340 repo = filectx._repo
340 repo = filectx._repo
341
341
342 ancestors = [filectx]
342 ancestors = [filectx]
343
343
344 try:
344 try:
345 repo.forcelinkrev = True
345 repo.forcelinkrev = True
346 ancestors.extend([f for f in filectx.ancestors()])
346 ancestors.extend([f for f in filectx.ancestors()])
347
347
348 ancestortext = ""
348 ancestortext = ""
349 for ancestorctx in ancestors:
349 for ancestorctx in ancestors:
350 parents = ancestorctx.parents()
350 parents = ancestorctx.parents()
351 p1 = nullid
351 p1 = nullid
352 p2 = nullid
352 p2 = nullid
353 if len(parents) > 0:
353 if len(parents) > 0:
354 p1 = parents[0].filenode()
354 p1 = parents[0].filenode()
355 if len(parents) > 1:
355 if len(parents) > 1:
356 p2 = parents[1].filenode()
356 p2 = parents[1].filenode()
357
357
358 copyname = ""
358 copyname = ""
359 rename = ancestorctx.renamed()
359 rename = ancestorctx.renamed()
360 if rename:
360 if rename:
361 copyname = rename[0]
361 copyname = rename[0]
362 linknode = ancestorctx.node()
362 linknode = ancestorctx.node()
363 ancestortext += "%s%s%s%s%s\0" % (
363 ancestortext += "%s%s%s%s%s\0" % (
364 ancestorctx.filenode(), p1, p2, linknode,
364 ancestorctx.filenode(), p1, p2, linknode,
365 copyname)
365 copyname)
366 finally:
366 finally:
367 repo.forcelinkrev = False
367 repo.forcelinkrev = False
368
368
369 header = shallowutil.buildfileblobheader(len(text), revlogflags)
369 header = shallowutil.buildfileblobheader(len(text), revlogflags)
370
370
371 return "%s\0%s%s" % (header, text, ancestortext)
371 return "%s\0%s%s" % (header, text, ancestortext)
372
372
373 def gcserver(ui, repo):
373 def gcserver(ui, repo):
374 if not repo.ui.configbool("remotefilelog", "server"):
374 if not repo.ui.configbool("remotefilelog", "server"):
375 return
375 return
376
376
377 neededfiles = set()
377 neededfiles = set()
378 heads = repo.revs("heads(tip~25000:) - null")
378 heads = repo.revs("heads(tip~25000:) - null")
379
379
380 cachepath = repo.vfs.join("remotefilelogcache")
380 cachepath = repo.vfs.join("remotefilelogcache")
381 for head in heads:
381 for head in heads:
382 mf = repo[head].manifest()
382 mf = repo[head].manifest()
383 for filename, filenode in mf.iteritems():
383 for filename, filenode in mf.iteritems():
384 filecachepath = os.path.join(cachepath, filename, hex(filenode))
384 filecachepath = os.path.join(cachepath, filename, hex(filenode))
385 neededfiles.add(filecachepath)
385 neededfiles.add(filecachepath)
386
386
387 # delete unneeded older files
387 # delete unneeded older files
388 days = repo.ui.configint("remotefilelog", "serverexpiration")
388 days = repo.ui.configint("remotefilelog", "serverexpiration")
389 expiration = time.time() - (days * 24 * 60 * 60)
389 expiration = time.time() - (days * 24 * 60 * 60)
390
390
391 _removing = _("removing old server cache")
391 _removing = _("removing old server cache")
392 count = 0
392 count = 0
393 ui.progress(_removing, count, unit="files")
393 ui.progress(_removing, count, unit="files")
394 for root, dirs, files in os.walk(cachepath):
394 for root, dirs, files in os.walk(cachepath):
395 for file in files:
395 for file in files:
396 filepath = os.path.join(root, file)
396 filepath = os.path.join(root, file)
397 count += 1
397 count += 1
398 ui.progress(_removing, count, unit="files")
398 ui.progress(_removing, count, unit="files")
399 if filepath in neededfiles:
399 if filepath in neededfiles:
400 continue
400 continue
401
401
402 stat = os.stat(filepath)
402 stat = os.stat(filepath)
403 if stat.st_mtime < expiration:
403 if stat.st_mtime < expiration:
404 os.remove(filepath)
404 os.remove(filepath)
405
405
406 ui.progress(_removing, None)
406 ui.progress(_removing, None)
General Comments 0
You need to be logged in to leave comments. Login now