##// END OF EJS Templates
py3: don't use dict.iterkeys()...
Pulkit Goyal -
r40649:9769e0f6 default
parent child Browse files
Show More
@@ -1,539 +1,539 b''
1 from __future__ import absolute_import
1 from __future__ import absolute_import
2
2
3 import collections
3 import collections
4 import errno
4 import errno
5 import hashlib
5 import hashlib
6 import mmap
6 import mmap
7 import os
7 import os
8 import struct
8 import struct
9 import time
9 import time
10
10
11 from mercurial.i18n import _
11 from mercurial.i18n import _
12 from mercurial import (
12 from mercurial import (
13 policy,
13 policy,
14 pycompat,
14 pycompat,
15 util,
15 util,
16 vfs as vfsmod,
16 vfs as vfsmod,
17 )
17 )
18 from . import shallowutil
18 from . import shallowutil
19
19
20 osutil = policy.importmod(r'osutil')
20 osutil = policy.importmod(r'osutil')
21
21
22 # The pack version supported by this implementation. This will need to be
22 # The pack version supported by this implementation. This will need to be
23 # rev'd whenever the byte format changes. Ex: changing the fanout prefix,
23 # rev'd whenever the byte format changes. Ex: changing the fanout prefix,
24 # changing any of the int sizes, changing the delta algorithm, etc.
24 # changing any of the int sizes, changing the delta algorithm, etc.
25 PACKVERSIONSIZE = 1
25 PACKVERSIONSIZE = 1
26 INDEXVERSIONSIZE = 2
26 INDEXVERSIONSIZE = 2
27
27
28 FANOUTSTART = INDEXVERSIONSIZE
28 FANOUTSTART = INDEXVERSIONSIZE
29
29
30 # Constant that indicates a fanout table entry hasn't been filled in. (This does
30 # Constant that indicates a fanout table entry hasn't been filled in. (This does
31 # not get serialized)
31 # not get serialized)
32 EMPTYFANOUT = -1
32 EMPTYFANOUT = -1
33
33
34 # The fanout prefix is the number of bytes that can be addressed by the fanout
34 # The fanout prefix is the number of bytes that can be addressed by the fanout
35 # table. Example: a fanout prefix of 1 means we use the first byte of a hash to
35 # table. Example: a fanout prefix of 1 means we use the first byte of a hash to
36 # look in the fanout table (which will be 2^8 entries long).
36 # look in the fanout table (which will be 2^8 entries long).
37 SMALLFANOUTPREFIX = 1
37 SMALLFANOUTPREFIX = 1
38 LARGEFANOUTPREFIX = 2
38 LARGEFANOUTPREFIX = 2
39
39
40 # The number of entries in the index at which point we switch to a large fanout.
40 # The number of entries in the index at which point we switch to a large fanout.
41 # It is chosen to balance the linear scan through a sparse fanout, with the
41 # It is chosen to balance the linear scan through a sparse fanout, with the
42 # size of the bisect in actual index.
42 # size of the bisect in actual index.
43 # 2^16 / 8 was chosen because it trades off (1 step fanout scan + 5 step
43 # 2^16 / 8 was chosen because it trades off (1 step fanout scan + 5 step
44 # bisect) with (8 step fanout scan + 1 step bisect)
44 # bisect) with (8 step fanout scan + 1 step bisect)
45 # 5 step bisect = log(2^16 / 8 / 255) # fanout
45 # 5 step bisect = log(2^16 / 8 / 255) # fanout
46 # 10 step fanout scan = 2^16 / (2^16 / 8) # fanout space divided by entries
46 # 10 step fanout scan = 2^16 / (2^16 / 8) # fanout space divided by entries
47 SMALLFANOUTCUTOFF = 2**16 / 8
47 SMALLFANOUTCUTOFF = 2**16 / 8
48
48
49 # The amount of time to wait between checking for new packs. This prevents an
49 # The amount of time to wait between checking for new packs. This prevents an
50 # exception when data is moved to a new pack after the process has already
50 # exception when data is moved to a new pack after the process has already
51 # loaded the pack list.
51 # loaded the pack list.
52 REFRESHRATE = 0.1
52 REFRESHRATE = 0.1
53
53
54 if pycompat.isposix:
54 if pycompat.isposix:
55 # With glibc 2.7+ the 'e' flag uses O_CLOEXEC when opening.
55 # With glibc 2.7+ the 'e' flag uses O_CLOEXEC when opening.
56 # The 'e' flag will be ignored on older versions of glibc.
56 # The 'e' flag will be ignored on older versions of glibc.
57 PACKOPENMODE = 'rbe'
57 PACKOPENMODE = 'rbe'
58 else:
58 else:
59 PACKOPENMODE = 'rb'
59 PACKOPENMODE = 'rb'
60
60
61 class _cachebackedpacks(object):
61 class _cachebackedpacks(object):
62 def __init__(self, packs, cachesize):
62 def __init__(self, packs, cachesize):
63 self._packs = set(packs)
63 self._packs = set(packs)
64 self._lrucache = util.lrucachedict(cachesize)
64 self._lrucache = util.lrucachedict(cachesize)
65 self._lastpack = None
65 self._lastpack = None
66
66
67 # Avoid cold start of the cache by populating the most recent packs
67 # Avoid cold start of the cache by populating the most recent packs
68 # in the cache.
68 # in the cache.
69 for i in reversed(range(min(cachesize, len(packs)))):
69 for i in reversed(range(min(cachesize, len(packs)))):
70 self._movetofront(packs[i])
70 self._movetofront(packs[i])
71
71
72 def _movetofront(self, pack):
72 def _movetofront(self, pack):
73 # This effectively makes pack the first entry in the cache.
73 # This effectively makes pack the first entry in the cache.
74 self._lrucache[pack] = True
74 self._lrucache[pack] = True
75
75
76 def _registerlastpackusage(self):
76 def _registerlastpackusage(self):
77 if self._lastpack is not None:
77 if self._lastpack is not None:
78 self._movetofront(self._lastpack)
78 self._movetofront(self._lastpack)
79 self._lastpack = None
79 self._lastpack = None
80
80
81 def add(self, pack):
81 def add(self, pack):
82 self._registerlastpackusage()
82 self._registerlastpackusage()
83
83
84 # This method will mostly be called when packs are not in cache.
84 # This method will mostly be called when packs are not in cache.
85 # Therefore, adding pack to the cache.
85 # Therefore, adding pack to the cache.
86 self._movetofront(pack)
86 self._movetofront(pack)
87 self._packs.add(pack)
87 self._packs.add(pack)
88
88
89 def __iter__(self):
89 def __iter__(self):
90 self._registerlastpackusage()
90 self._registerlastpackusage()
91
91
92 # Cache iteration is based on LRU.
92 # Cache iteration is based on LRU.
93 for pack in self._lrucache:
93 for pack in self._lrucache:
94 self._lastpack = pack
94 self._lastpack = pack
95 yield pack
95 yield pack
96
96
97 cachedpacks = set(pack for pack in self._lrucache)
97 cachedpacks = set(pack for pack in self._lrucache)
98 # Yield for paths not in the cache.
98 # Yield for paths not in the cache.
99 for pack in self._packs - cachedpacks:
99 for pack in self._packs - cachedpacks:
100 self._lastpack = pack
100 self._lastpack = pack
101 yield pack
101 yield pack
102
102
103 # Data not found in any pack.
103 # Data not found in any pack.
104 self._lastpack = None
104 self._lastpack = None
105
105
106 class basepackstore(object):
106 class basepackstore(object):
107 # Default cache size limit for the pack files.
107 # Default cache size limit for the pack files.
108 DEFAULTCACHESIZE = 100
108 DEFAULTCACHESIZE = 100
109
109
110 def __init__(self, ui, path):
110 def __init__(self, ui, path):
111 self.ui = ui
111 self.ui = ui
112 self.path = path
112 self.path = path
113
113
114 # lastrefesh is 0 so we'll immediately check for new packs on the first
114 # lastrefesh is 0 so we'll immediately check for new packs on the first
115 # failure.
115 # failure.
116 self.lastrefresh = 0
116 self.lastrefresh = 0
117
117
118 packs = []
118 packs = []
119 for filepath, __, __ in self._getavailablepackfilessorted():
119 for filepath, __, __ in self._getavailablepackfilessorted():
120 try:
120 try:
121 pack = self.getpack(filepath)
121 pack = self.getpack(filepath)
122 except Exception as ex:
122 except Exception as ex:
123 # An exception may be thrown if the pack file is corrupted
123 # An exception may be thrown if the pack file is corrupted
124 # somehow. Log a warning but keep going in this case, just
124 # somehow. Log a warning but keep going in this case, just
125 # skipping this pack file.
125 # skipping this pack file.
126 #
126 #
127 # If this is an ENOENT error then don't even bother logging.
127 # If this is an ENOENT error then don't even bother logging.
128 # Someone could have removed the file since we retrieved the
128 # Someone could have removed the file since we retrieved the
129 # list of paths.
129 # list of paths.
130 if getattr(ex, 'errno', None) != errno.ENOENT:
130 if getattr(ex, 'errno', None) != errno.ENOENT:
131 ui.warn(_('unable to load pack %s: %s\n') % (filepath, ex))
131 ui.warn(_('unable to load pack %s: %s\n') % (filepath, ex))
132 continue
132 continue
133 packs.append(pack)
133 packs.append(pack)
134
134
135 self.packs = _cachebackedpacks(packs, self.DEFAULTCACHESIZE)
135 self.packs = _cachebackedpacks(packs, self.DEFAULTCACHESIZE)
136
136
137 def _getavailablepackfiles(self):
137 def _getavailablepackfiles(self):
138 """For each pack file (a index/data file combo), yields:
138 """For each pack file (a index/data file combo), yields:
139 (full path without extension, mtime, size)
139 (full path without extension, mtime, size)
140
140
141 mtime will be the mtime of the index/data file (whichever is newer)
141 mtime will be the mtime of the index/data file (whichever is newer)
142 size is the combined size of index/data file
142 size is the combined size of index/data file
143 """
143 """
144 indexsuffixlen = len(self.INDEXSUFFIX)
144 indexsuffixlen = len(self.INDEXSUFFIX)
145 packsuffixlen = len(self.PACKSUFFIX)
145 packsuffixlen = len(self.PACKSUFFIX)
146
146
147 ids = set()
147 ids = set()
148 sizes = collections.defaultdict(lambda: 0)
148 sizes = collections.defaultdict(lambda: 0)
149 mtimes = collections.defaultdict(lambda: [])
149 mtimes = collections.defaultdict(lambda: [])
150 try:
150 try:
151 for filename, type, stat in osutil.listdir(self.path, stat=True):
151 for filename, type, stat in osutil.listdir(self.path, stat=True):
152 id = None
152 id = None
153 if filename[-indexsuffixlen:] == self.INDEXSUFFIX:
153 if filename[-indexsuffixlen:] == self.INDEXSUFFIX:
154 id = filename[:-indexsuffixlen]
154 id = filename[:-indexsuffixlen]
155 elif filename[-packsuffixlen:] == self.PACKSUFFIX:
155 elif filename[-packsuffixlen:] == self.PACKSUFFIX:
156 id = filename[:-packsuffixlen]
156 id = filename[:-packsuffixlen]
157
157
158 # Since we expect to have two files corresponding to each ID
158 # Since we expect to have two files corresponding to each ID
159 # (the index file and the pack file), we can yield once we see
159 # (the index file and the pack file), we can yield once we see
160 # it twice.
160 # it twice.
161 if id:
161 if id:
162 sizes[id] += stat.st_size # Sum both files' sizes together
162 sizes[id] += stat.st_size # Sum both files' sizes together
163 mtimes[id].append(stat.st_mtime)
163 mtimes[id].append(stat.st_mtime)
164 if id in ids:
164 if id in ids:
165 yield (os.path.join(self.path, id), max(mtimes[id]),
165 yield (os.path.join(self.path, id), max(mtimes[id]),
166 sizes[id])
166 sizes[id])
167 else:
167 else:
168 ids.add(id)
168 ids.add(id)
169 except OSError as ex:
169 except OSError as ex:
170 if ex.errno != errno.ENOENT:
170 if ex.errno != errno.ENOENT:
171 raise
171 raise
172
172
173 def _getavailablepackfilessorted(self):
173 def _getavailablepackfilessorted(self):
174 """Like `_getavailablepackfiles`, but also sorts the files by mtime,
174 """Like `_getavailablepackfiles`, but also sorts the files by mtime,
175 yielding newest files first.
175 yielding newest files first.
176
176
177 This is desirable, since it is more likely newer packfiles have more
177 This is desirable, since it is more likely newer packfiles have more
178 desirable data.
178 desirable data.
179 """
179 """
180 files = []
180 files = []
181 for path, mtime, size in self._getavailablepackfiles():
181 for path, mtime, size in self._getavailablepackfiles():
182 files.append((mtime, size, path))
182 files.append((mtime, size, path))
183 files = sorted(files, reverse=True)
183 files = sorted(files, reverse=True)
184 for mtime, size, path in files:
184 for mtime, size, path in files:
185 yield path, mtime, size
185 yield path, mtime, size
186
186
187 def gettotalsizeandcount(self):
187 def gettotalsizeandcount(self):
188 """Returns the total disk size (in bytes) of all the pack files in
188 """Returns the total disk size (in bytes) of all the pack files in
189 this store, and the count of pack files.
189 this store, and the count of pack files.
190
190
191 (This might be smaller than the total size of the ``self.path``
191 (This might be smaller than the total size of the ``self.path``
192 directory, since this only considers fuly-writen pack files, and not
192 directory, since this only considers fuly-writen pack files, and not
193 temporary files or other detritus on the directory.)
193 temporary files or other detritus on the directory.)
194 """
194 """
195 totalsize = 0
195 totalsize = 0
196 count = 0
196 count = 0
197 for __, __, size in self._getavailablepackfiles():
197 for __, __, size in self._getavailablepackfiles():
198 totalsize += size
198 totalsize += size
199 count += 1
199 count += 1
200 return totalsize, count
200 return totalsize, count
201
201
202 def getmetrics(self):
202 def getmetrics(self):
203 """Returns metrics on the state of this store."""
203 """Returns metrics on the state of this store."""
204 size, count = self.gettotalsizeandcount()
204 size, count = self.gettotalsizeandcount()
205 return {
205 return {
206 'numpacks': count,
206 'numpacks': count,
207 'totalpacksize': size,
207 'totalpacksize': size,
208 }
208 }
209
209
210 def getpack(self, path):
210 def getpack(self, path):
211 raise NotImplementedError()
211 raise NotImplementedError()
212
212
213 def getmissing(self, keys):
213 def getmissing(self, keys):
214 missing = keys
214 missing = keys
215 for pack in self.packs:
215 for pack in self.packs:
216 missing = pack.getmissing(missing)
216 missing = pack.getmissing(missing)
217
217
218 # Ensures better performance of the cache by keeping the most
218 # Ensures better performance of the cache by keeping the most
219 # recently accessed pack at the beginning in subsequent iterations.
219 # recently accessed pack at the beginning in subsequent iterations.
220 if not missing:
220 if not missing:
221 return missing
221 return missing
222
222
223 if missing:
223 if missing:
224 for pack in self.refresh():
224 for pack in self.refresh():
225 missing = pack.getmissing(missing)
225 missing = pack.getmissing(missing)
226
226
227 return missing
227 return missing
228
228
229 def markledger(self, ledger, options=None):
229 def markledger(self, ledger, options=None):
230 for pack in self.packs:
230 for pack in self.packs:
231 pack.markledger(ledger)
231 pack.markledger(ledger)
232
232
233 def markforrefresh(self):
233 def markforrefresh(self):
234 """Tells the store that there may be new pack files, so the next time it
234 """Tells the store that there may be new pack files, so the next time it
235 has a lookup miss it should check for new files."""
235 has a lookup miss it should check for new files."""
236 self.lastrefresh = 0
236 self.lastrefresh = 0
237
237
238 def refresh(self):
238 def refresh(self):
239 """Checks for any new packs on disk, adds them to the main pack list,
239 """Checks for any new packs on disk, adds them to the main pack list,
240 and returns a list of just the new packs."""
240 and returns a list of just the new packs."""
241 now = time.time()
241 now = time.time()
242
242
243 # If we experience a lot of misses (like in the case of getmissing() on
243 # If we experience a lot of misses (like in the case of getmissing() on
244 # new objects), let's only actually check disk for new stuff every once
244 # new objects), let's only actually check disk for new stuff every once
245 # in a while. Generally this code path should only ever matter when a
245 # in a while. Generally this code path should only ever matter when a
246 # repack is going on in the background, and that should be pretty rare
246 # repack is going on in the background, and that should be pretty rare
247 # to have that happen twice in quick succession.
247 # to have that happen twice in quick succession.
248 newpacks = []
248 newpacks = []
249 if now > self.lastrefresh + REFRESHRATE:
249 if now > self.lastrefresh + REFRESHRATE:
250 self.lastrefresh = now
250 self.lastrefresh = now
251 previous = set(p.path for p in self.packs)
251 previous = set(p.path for p in self.packs)
252 for filepath, __, __ in self._getavailablepackfilessorted():
252 for filepath, __, __ in self._getavailablepackfilessorted():
253 if filepath not in previous:
253 if filepath not in previous:
254 newpack = self.getpack(filepath)
254 newpack = self.getpack(filepath)
255 newpacks.append(newpack)
255 newpacks.append(newpack)
256 self.packs.add(newpack)
256 self.packs.add(newpack)
257
257
258 return newpacks
258 return newpacks
259
259
260 class versionmixin(object):
260 class versionmixin(object):
261 # Mix-in for classes with multiple supported versions
261 # Mix-in for classes with multiple supported versions
262 VERSION = None
262 VERSION = None
263 SUPPORTED_VERSIONS = [2]
263 SUPPORTED_VERSIONS = [2]
264
264
265 def _checkversion(self, version):
265 def _checkversion(self, version):
266 if version in self.SUPPORTED_VERSIONS:
266 if version in self.SUPPORTED_VERSIONS:
267 if self.VERSION is None:
267 if self.VERSION is None:
268 # only affect this instance
268 # only affect this instance
269 self.VERSION = version
269 self.VERSION = version
270 elif self.VERSION != version:
270 elif self.VERSION != version:
271 raise RuntimeError('inconsistent version: %s' % version)
271 raise RuntimeError('inconsistent version: %s' % version)
272 else:
272 else:
273 raise RuntimeError('unsupported version: %s' % version)
273 raise RuntimeError('unsupported version: %s' % version)
274
274
275 class basepack(versionmixin):
275 class basepack(versionmixin):
276 # The maximum amount we should read via mmap before remmaping so the old
276 # The maximum amount we should read via mmap before remmaping so the old
277 # pages can be released (100MB)
277 # pages can be released (100MB)
278 MAXPAGEDIN = 100 * 1024**2
278 MAXPAGEDIN = 100 * 1024**2
279
279
280 SUPPORTED_VERSIONS = [2]
280 SUPPORTED_VERSIONS = [2]
281
281
282 def __init__(self, path):
282 def __init__(self, path):
283 self.path = path
283 self.path = path
284 self.packpath = path + self.PACKSUFFIX
284 self.packpath = path + self.PACKSUFFIX
285 self.indexpath = path + self.INDEXSUFFIX
285 self.indexpath = path + self.INDEXSUFFIX
286
286
287 self.indexsize = os.stat(self.indexpath).st_size
287 self.indexsize = os.stat(self.indexpath).st_size
288 self.datasize = os.stat(self.packpath).st_size
288 self.datasize = os.stat(self.packpath).st_size
289
289
290 self._index = None
290 self._index = None
291 self._data = None
291 self._data = None
292 self.freememory() # initialize the mmap
292 self.freememory() # initialize the mmap
293
293
294 version = struct.unpack('!B', self._data[:PACKVERSIONSIZE])[0]
294 version = struct.unpack('!B', self._data[:PACKVERSIONSIZE])[0]
295 self._checkversion(version)
295 self._checkversion(version)
296
296
297 version, config = struct.unpack('!BB', self._index[:INDEXVERSIONSIZE])
297 version, config = struct.unpack('!BB', self._index[:INDEXVERSIONSIZE])
298 self._checkversion(version)
298 self._checkversion(version)
299
299
300 if 0b10000000 & config:
300 if 0b10000000 & config:
301 self.params = indexparams(LARGEFANOUTPREFIX, version)
301 self.params = indexparams(LARGEFANOUTPREFIX, version)
302 else:
302 else:
303 self.params = indexparams(SMALLFANOUTPREFIX, version)
303 self.params = indexparams(SMALLFANOUTPREFIX, version)
304
304
305 @util.propertycache
305 @util.propertycache
306 def _fanouttable(self):
306 def _fanouttable(self):
307 params = self.params
307 params = self.params
308 rawfanout = self._index[FANOUTSTART:FANOUTSTART + params.fanoutsize]
308 rawfanout = self._index[FANOUTSTART:FANOUTSTART + params.fanoutsize]
309 fanouttable = []
309 fanouttable = []
310 for i in pycompat.xrange(0, params.fanoutcount):
310 for i in pycompat.xrange(0, params.fanoutcount):
311 loc = i * 4
311 loc = i * 4
312 fanoutentry = struct.unpack('!I', rawfanout[loc:loc + 4])[0]
312 fanoutentry = struct.unpack('!I', rawfanout[loc:loc + 4])[0]
313 fanouttable.append(fanoutentry)
313 fanouttable.append(fanoutentry)
314 return fanouttable
314 return fanouttable
315
315
316 @util.propertycache
316 @util.propertycache
317 def _indexend(self):
317 def _indexend(self):
318 nodecount = struct.unpack_from('!Q', self._index,
318 nodecount = struct.unpack_from('!Q', self._index,
319 self.params.indexstart - 8)[0]
319 self.params.indexstart - 8)[0]
320 return self.params.indexstart + nodecount * self.INDEXENTRYLENGTH
320 return self.params.indexstart + nodecount * self.INDEXENTRYLENGTH
321
321
322 def freememory(self):
322 def freememory(self):
323 """Unmap and remap the memory to free it up after known expensive
323 """Unmap and remap the memory to free it up after known expensive
324 operations. Return True if self._data and self._index were reloaded.
324 operations. Return True if self._data and self._index were reloaded.
325 """
325 """
326 if self._index:
326 if self._index:
327 if self._pagedin < self.MAXPAGEDIN:
327 if self._pagedin < self.MAXPAGEDIN:
328 return False
328 return False
329
329
330 self._index.close()
330 self._index.close()
331 self._data.close()
331 self._data.close()
332
332
333 # TODO: use an opener/vfs to access these paths
333 # TODO: use an opener/vfs to access these paths
334 with open(self.indexpath, PACKOPENMODE) as indexfp:
334 with open(self.indexpath, PACKOPENMODE) as indexfp:
335 # memory-map the file, size 0 means whole file
335 # memory-map the file, size 0 means whole file
336 self._index = mmap.mmap(indexfp.fileno(), 0,
336 self._index = mmap.mmap(indexfp.fileno(), 0,
337 access=mmap.ACCESS_READ)
337 access=mmap.ACCESS_READ)
338 with open(self.packpath, PACKOPENMODE) as datafp:
338 with open(self.packpath, PACKOPENMODE) as datafp:
339 self._data = mmap.mmap(datafp.fileno(), 0, access=mmap.ACCESS_READ)
339 self._data = mmap.mmap(datafp.fileno(), 0, access=mmap.ACCESS_READ)
340
340
341 self._pagedin = 0
341 self._pagedin = 0
342 return True
342 return True
343
343
344 def getmissing(self, keys):
344 def getmissing(self, keys):
345 raise NotImplementedError()
345 raise NotImplementedError()
346
346
347 def markledger(self, ledger, options=None):
347 def markledger(self, ledger, options=None):
348 raise NotImplementedError()
348 raise NotImplementedError()
349
349
350 def cleanup(self, ledger):
350 def cleanup(self, ledger):
351 raise NotImplementedError()
351 raise NotImplementedError()
352
352
353 def __iter__(self):
353 def __iter__(self):
354 raise NotImplementedError()
354 raise NotImplementedError()
355
355
356 def iterentries(self):
356 def iterentries(self):
357 raise NotImplementedError()
357 raise NotImplementedError()
358
358
359 class mutablebasepack(versionmixin):
359 class mutablebasepack(versionmixin):
360
360
361 def __init__(self, ui, packdir, version=2):
361 def __init__(self, ui, packdir, version=2):
362 self._checkversion(version)
362 self._checkversion(version)
363 # TODO(augie): make this configurable
363 # TODO(augie): make this configurable
364 self._compressor = 'GZ'
364 self._compressor = 'GZ'
365 opener = vfsmod.vfs(packdir)
365 opener = vfsmod.vfs(packdir)
366 opener.createmode = 0o444
366 opener.createmode = 0o444
367 self.opener = opener
367 self.opener = opener
368
368
369 self.entries = {}
369 self.entries = {}
370
370
371 shallowutil.mkstickygroupdir(ui, packdir)
371 shallowutil.mkstickygroupdir(ui, packdir)
372 self.packfp, self.packpath = opener.mkstemp(
372 self.packfp, self.packpath = opener.mkstemp(
373 suffix=self.PACKSUFFIX + '-tmp')
373 suffix=self.PACKSUFFIX + '-tmp')
374 self.idxfp, self.idxpath = opener.mkstemp(
374 self.idxfp, self.idxpath = opener.mkstemp(
375 suffix=self.INDEXSUFFIX + '-tmp')
375 suffix=self.INDEXSUFFIX + '-tmp')
376 self.packfp = os.fdopen(self.packfp, r'w+')
376 self.packfp = os.fdopen(self.packfp, r'w+')
377 self.idxfp = os.fdopen(self.idxfp, r'w+')
377 self.idxfp = os.fdopen(self.idxfp, r'w+')
378 self.sha = hashlib.sha1()
378 self.sha = hashlib.sha1()
379 self._closed = False
379 self._closed = False
380
380
381 # The opener provides no way of doing permission fixup on files created
381 # The opener provides no way of doing permission fixup on files created
382 # via mkstemp, so we must fix it ourselves. We can probably fix this
382 # via mkstemp, so we must fix it ourselves. We can probably fix this
383 # upstream in vfs.mkstemp so we don't need to use the private method.
383 # upstream in vfs.mkstemp so we don't need to use the private method.
384 opener._fixfilemode(opener.join(self.packpath))
384 opener._fixfilemode(opener.join(self.packpath))
385 opener._fixfilemode(opener.join(self.idxpath))
385 opener._fixfilemode(opener.join(self.idxpath))
386
386
387 # Write header
387 # Write header
388 # TODO: make it extensible (ex: allow specifying compression algorithm,
388 # TODO: make it extensible (ex: allow specifying compression algorithm,
389 # a flexible key/value header, delta algorithm, fanout size, etc)
389 # a flexible key/value header, delta algorithm, fanout size, etc)
390 versionbuf = struct.pack('!B', self.VERSION) # unsigned 1 byte int
390 versionbuf = struct.pack('!B', self.VERSION) # unsigned 1 byte int
391 self.writeraw(versionbuf)
391 self.writeraw(versionbuf)
392
392
393 def __enter__(self):
393 def __enter__(self):
394 return self
394 return self
395
395
396 def __exit__(self, exc_type, exc_value, traceback):
396 def __exit__(self, exc_type, exc_value, traceback):
397 if exc_type is None:
397 if exc_type is None:
398 self.close()
398 self.close()
399 else:
399 else:
400 self.abort()
400 self.abort()
401
401
402 def abort(self):
402 def abort(self):
403 # Unclean exit
403 # Unclean exit
404 self._cleantemppacks()
404 self._cleantemppacks()
405
405
406 def writeraw(self, data):
406 def writeraw(self, data):
407 self.packfp.write(data)
407 self.packfp.write(data)
408 self.sha.update(data)
408 self.sha.update(data)
409
409
410 def close(self, ledger=None):
410 def close(self, ledger=None):
411 if self._closed:
411 if self._closed:
412 return
412 return
413
413
414 try:
414 try:
415 sha = self.sha.hexdigest()
415 sha = self.sha.hexdigest()
416 self.packfp.close()
416 self.packfp.close()
417 self.writeindex()
417 self.writeindex()
418
418
419 if len(self.entries) == 0:
419 if len(self.entries) == 0:
420 # Empty pack
420 # Empty pack
421 self._cleantemppacks()
421 self._cleantemppacks()
422 self._closed = True
422 self._closed = True
423 return None
423 return None
424
424
425 self.opener.rename(self.packpath, sha + self.PACKSUFFIX)
425 self.opener.rename(self.packpath, sha + self.PACKSUFFIX)
426 try:
426 try:
427 self.opener.rename(self.idxpath, sha + self.INDEXSUFFIX)
427 self.opener.rename(self.idxpath, sha + self.INDEXSUFFIX)
428 except Exception as ex:
428 except Exception as ex:
429 try:
429 try:
430 self.opener.unlink(sha + self.PACKSUFFIX)
430 self.opener.unlink(sha + self.PACKSUFFIX)
431 except Exception:
431 except Exception:
432 pass
432 pass
433 # Throw exception 'ex' explicitly since a normal 'raise' would
433 # Throw exception 'ex' explicitly since a normal 'raise' would
434 # potentially throw an exception from the unlink cleanup.
434 # potentially throw an exception from the unlink cleanup.
435 raise ex
435 raise ex
436 except Exception:
436 except Exception:
437 # Clean up temp packs in all exception cases
437 # Clean up temp packs in all exception cases
438 self._cleantemppacks()
438 self._cleantemppacks()
439 raise
439 raise
440
440
441 self._closed = True
441 self._closed = True
442 result = self.opener.join(sha)
442 result = self.opener.join(sha)
443 if ledger:
443 if ledger:
444 ledger.addcreated(result)
444 ledger.addcreated(result)
445 return result
445 return result
446
446
447 def _cleantemppacks(self):
447 def _cleantemppacks(self):
448 try:
448 try:
449 self.opener.unlink(self.packpath)
449 self.opener.unlink(self.packpath)
450 except Exception:
450 except Exception:
451 pass
451 pass
452 try:
452 try:
453 self.opener.unlink(self.idxpath)
453 self.opener.unlink(self.idxpath)
454 except Exception:
454 except Exception:
455 pass
455 pass
456
456
457 def writeindex(self):
457 def writeindex(self):
458 rawindex = ''
458 rawindex = ''
459
459
460 largefanout = len(self.entries) > SMALLFANOUTCUTOFF
460 largefanout = len(self.entries) > SMALLFANOUTCUTOFF
461 if largefanout:
461 if largefanout:
462 params = indexparams(LARGEFANOUTPREFIX, self.VERSION)
462 params = indexparams(LARGEFANOUTPREFIX, self.VERSION)
463 else:
463 else:
464 params = indexparams(SMALLFANOUTPREFIX, self.VERSION)
464 params = indexparams(SMALLFANOUTPREFIX, self.VERSION)
465
465
466 fanouttable = [EMPTYFANOUT] * params.fanoutcount
466 fanouttable = [EMPTYFANOUT] * params.fanoutcount
467
467
468 # Precompute the location of each entry
468 # Precompute the location of each entry
469 locations = {}
469 locations = {}
470 count = 0
470 count = 0
471 for node in sorted(self.entries.iterkeys()):
471 for node in sorted(self.entries):
472 location = count * self.INDEXENTRYLENGTH
472 location = count * self.INDEXENTRYLENGTH
473 locations[node] = location
473 locations[node] = location
474 count += 1
474 count += 1
475
475
476 # Must use [0] on the unpack result since it's always a tuple.
476 # Must use [0] on the unpack result since it's always a tuple.
477 fanoutkey = struct.unpack(params.fanoutstruct,
477 fanoutkey = struct.unpack(params.fanoutstruct,
478 node[:params.fanoutprefix])[0]
478 node[:params.fanoutprefix])[0]
479 if fanouttable[fanoutkey] == EMPTYFANOUT:
479 if fanouttable[fanoutkey] == EMPTYFANOUT:
480 fanouttable[fanoutkey] = location
480 fanouttable[fanoutkey] = location
481
481
482 rawfanouttable = ''
482 rawfanouttable = ''
483 last = 0
483 last = 0
484 for offset in fanouttable:
484 for offset in fanouttable:
485 offset = offset if offset != EMPTYFANOUT else last
485 offset = offset if offset != EMPTYFANOUT else last
486 last = offset
486 last = offset
487 rawfanouttable += struct.pack('!I', offset)
487 rawfanouttable += struct.pack('!I', offset)
488
488
489 rawentrieslength = struct.pack('!Q', len(self.entries))
489 rawentrieslength = struct.pack('!Q', len(self.entries))
490
490
491 # The index offset is the it's location in the file. So after the 2 byte
491 # The index offset is the it's location in the file. So after the 2 byte
492 # header and the fanouttable.
492 # header and the fanouttable.
493 rawindex = self.createindex(locations, 2 + len(rawfanouttable))
493 rawindex = self.createindex(locations, 2 + len(rawfanouttable))
494
494
495 self._writeheader(params)
495 self._writeheader(params)
496 self.idxfp.write(rawfanouttable)
496 self.idxfp.write(rawfanouttable)
497 self.idxfp.write(rawentrieslength)
497 self.idxfp.write(rawentrieslength)
498 self.idxfp.write(rawindex)
498 self.idxfp.write(rawindex)
499 self.idxfp.close()
499 self.idxfp.close()
500
500
501 def createindex(self, nodelocations):
501 def createindex(self, nodelocations):
502 raise NotImplementedError()
502 raise NotImplementedError()
503
503
504 def _writeheader(self, indexparams):
504 def _writeheader(self, indexparams):
505 # Index header
505 # Index header
506 # <version: 1 byte>
506 # <version: 1 byte>
507 # <large fanout: 1 bit> # 1 means 2^16, 0 means 2^8
507 # <large fanout: 1 bit> # 1 means 2^16, 0 means 2^8
508 # <unused: 7 bit> # future use (compression, delta format, etc)
508 # <unused: 7 bit> # future use (compression, delta format, etc)
509 config = 0
509 config = 0
510 if indexparams.fanoutprefix == LARGEFANOUTPREFIX:
510 if indexparams.fanoutprefix == LARGEFANOUTPREFIX:
511 config = 0b10000000
511 config = 0b10000000
512 self.idxfp.write(struct.pack('!BB', self.VERSION, config))
512 self.idxfp.write(struct.pack('!BB', self.VERSION, config))
513
513
514 class indexparams(object):
514 class indexparams(object):
515 __slots__ = (r'fanoutprefix', r'fanoutstruct', r'fanoutcount',
515 __slots__ = (r'fanoutprefix', r'fanoutstruct', r'fanoutcount',
516 r'fanoutsize', r'indexstart')
516 r'fanoutsize', r'indexstart')
517
517
518 def __init__(self, prefixsize, version):
518 def __init__(self, prefixsize, version):
519 self.fanoutprefix = prefixsize
519 self.fanoutprefix = prefixsize
520
520
521 # The struct pack format for fanout table location (i.e. the format that
521 # The struct pack format for fanout table location (i.e. the format that
522 # converts the node prefix into an integer location in the fanout
522 # converts the node prefix into an integer location in the fanout
523 # table).
523 # table).
524 if prefixsize == SMALLFANOUTPREFIX:
524 if prefixsize == SMALLFANOUTPREFIX:
525 self.fanoutstruct = '!B'
525 self.fanoutstruct = '!B'
526 elif prefixsize == LARGEFANOUTPREFIX:
526 elif prefixsize == LARGEFANOUTPREFIX:
527 self.fanoutstruct = '!H'
527 self.fanoutstruct = '!H'
528 else:
528 else:
529 raise ValueError("invalid fanout prefix size: %s" % prefixsize)
529 raise ValueError("invalid fanout prefix size: %s" % prefixsize)
530
530
531 # The number of fanout table entries
531 # The number of fanout table entries
532 self.fanoutcount = 2**(prefixsize * 8)
532 self.fanoutcount = 2**(prefixsize * 8)
533
533
534 # The total bytes used by the fanout table
534 # The total bytes used by the fanout table
535 self.fanoutsize = self.fanoutcount * 4
535 self.fanoutsize = self.fanoutcount * 4
536
536
537 self.indexstart = FANOUTSTART + self.fanoutsize
537 self.indexstart = FANOUTSTART + self.fanoutsize
538 # Skip the index length
538 # Skip the index length
539 self.indexstart += 8
539 self.indexstart += 8
@@ -1,589 +1,589 b''
1 # fileserverclient.py - client for communicating with the cache process
1 # fileserverclient.py - client for communicating with the cache process
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import io
11 import io
12 import os
12 import os
13 import threading
13 import threading
14 import time
14 import time
15 import zlib
15 import zlib
16
16
17 from mercurial.i18n import _
17 from mercurial.i18n import _
18 from mercurial.node import bin, hex, nullid
18 from mercurial.node import bin, hex, nullid
19 from mercurial import (
19 from mercurial import (
20 error,
20 error,
21 node,
21 node,
22 pycompat,
22 pycompat,
23 revlog,
23 revlog,
24 sshpeer,
24 sshpeer,
25 util,
25 util,
26 wireprotov1peer,
26 wireprotov1peer,
27 )
27 )
28 from mercurial.utils import procutil
28 from mercurial.utils import procutil
29
29
30 from . import (
30 from . import (
31 constants,
31 constants,
32 contentstore,
32 contentstore,
33 metadatastore,
33 metadatastore,
34 )
34 )
35
35
36 _sshv1peer = sshpeer.sshv1peer
36 _sshv1peer = sshpeer.sshv1peer
37
37
38 # Statistics for debugging
38 # Statistics for debugging
39 fetchcost = 0
39 fetchcost = 0
40 fetches = 0
40 fetches = 0
41 fetched = 0
41 fetched = 0
42 fetchmisses = 0
42 fetchmisses = 0
43
43
44 _lfsmod = None
44 _lfsmod = None
45 _downloading = _('downloading')
45 _downloading = _('downloading')
46
46
47 def getcachekey(reponame, file, id):
47 def getcachekey(reponame, file, id):
48 pathhash = node.hex(hashlib.sha1(file).digest())
48 pathhash = node.hex(hashlib.sha1(file).digest())
49 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
49 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
50
50
51 def getlocalkey(file, id):
51 def getlocalkey(file, id):
52 pathhash = node.hex(hashlib.sha1(file).digest())
52 pathhash = node.hex(hashlib.sha1(file).digest())
53 return os.path.join(pathhash, id)
53 return os.path.join(pathhash, id)
54
54
55 def peersetup(ui, peer):
55 def peersetup(ui, peer):
56
56
57 class remotefilepeer(peer.__class__):
57 class remotefilepeer(peer.__class__):
58 @wireprotov1peer.batchable
58 @wireprotov1peer.batchable
59 def x_rfl_getfile(self, file, node):
59 def x_rfl_getfile(self, file, node):
60 if not self.capable('x_rfl_getfile'):
60 if not self.capable('x_rfl_getfile'):
61 raise error.Abort(
61 raise error.Abort(
62 'configured remotefile server does not support getfile')
62 'configured remotefile server does not support getfile')
63 f = wireprotov1peer.future()
63 f = wireprotov1peer.future()
64 yield {'file': file, 'node': node}, f
64 yield {'file': file, 'node': node}, f
65 code, data = f.value.split('\0', 1)
65 code, data = f.value.split('\0', 1)
66 if int(code):
66 if int(code):
67 raise error.LookupError(file, node, data)
67 raise error.LookupError(file, node, data)
68 yield data
68 yield data
69
69
70 @wireprotov1peer.batchable
70 @wireprotov1peer.batchable
71 def x_rfl_getflogheads(self, path):
71 def x_rfl_getflogheads(self, path):
72 if not self.capable('x_rfl_getflogheads'):
72 if not self.capable('x_rfl_getflogheads'):
73 raise error.Abort('configured remotefile server does not '
73 raise error.Abort('configured remotefile server does not '
74 'support getflogheads')
74 'support getflogheads')
75 f = wireprotov1peer.future()
75 f = wireprotov1peer.future()
76 yield {'path': path}, f
76 yield {'path': path}, f
77 heads = f.value.split('\n') if f.value else []
77 heads = f.value.split('\n') if f.value else []
78 yield heads
78 yield heads
79
79
80 def _updatecallstreamopts(self, command, opts):
80 def _updatecallstreamopts(self, command, opts):
81 if command != 'getbundle':
81 if command != 'getbundle':
82 return
82 return
83 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
83 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
84 not in self.capabilities()):
84 not in self.capabilities()):
85 return
85 return
86 if not util.safehasattr(self, '_localrepo'):
86 if not util.safehasattr(self, '_localrepo'):
87 return
87 return
88 if (constants.SHALLOWREPO_REQUIREMENT
88 if (constants.SHALLOWREPO_REQUIREMENT
89 not in self._localrepo.requirements):
89 not in self._localrepo.requirements):
90 return
90 return
91
91
92 bundlecaps = opts.get('bundlecaps')
92 bundlecaps = opts.get('bundlecaps')
93 if bundlecaps:
93 if bundlecaps:
94 bundlecaps = [bundlecaps]
94 bundlecaps = [bundlecaps]
95 else:
95 else:
96 bundlecaps = []
96 bundlecaps = []
97
97
98 # shallow, includepattern, and excludepattern are a hacky way of
98 # shallow, includepattern, and excludepattern are a hacky way of
99 # carrying over data from the local repo to this getbundle
99 # carrying over data from the local repo to this getbundle
100 # command. We need to do it this way because bundle1 getbundle
100 # command. We need to do it this way because bundle1 getbundle
101 # doesn't provide any other place we can hook in to manipulate
101 # doesn't provide any other place we can hook in to manipulate
102 # getbundle args before it goes across the wire. Once we get rid
102 # getbundle args before it goes across the wire. Once we get rid
103 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
103 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
104 # do this more cleanly.
104 # do this more cleanly.
105 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
105 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
106 if self._localrepo.includepattern:
106 if self._localrepo.includepattern:
107 patterns = '\0'.join(self._localrepo.includepattern)
107 patterns = '\0'.join(self._localrepo.includepattern)
108 includecap = "includepattern=" + patterns
108 includecap = "includepattern=" + patterns
109 bundlecaps.append(includecap)
109 bundlecaps.append(includecap)
110 if self._localrepo.excludepattern:
110 if self._localrepo.excludepattern:
111 patterns = '\0'.join(self._localrepo.excludepattern)
111 patterns = '\0'.join(self._localrepo.excludepattern)
112 excludecap = "excludepattern=" + patterns
112 excludecap = "excludepattern=" + patterns
113 bundlecaps.append(excludecap)
113 bundlecaps.append(excludecap)
114 opts['bundlecaps'] = ','.join(bundlecaps)
114 opts['bundlecaps'] = ','.join(bundlecaps)
115
115
116 def _sendrequest(self, command, args, **opts):
116 def _sendrequest(self, command, args, **opts):
117 self._updatecallstreamopts(command, args)
117 self._updatecallstreamopts(command, args)
118 return super(remotefilepeer, self)._sendrequest(command, args,
118 return super(remotefilepeer, self)._sendrequest(command, args,
119 **opts)
119 **opts)
120
120
121 def _callstream(self, command, **opts):
121 def _callstream(self, command, **opts):
122 supertype = super(remotefilepeer, self)
122 supertype = super(remotefilepeer, self)
123 if not util.safehasattr(supertype, '_sendrequest'):
123 if not util.safehasattr(supertype, '_sendrequest'):
124 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
124 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
125 return super(remotefilepeer, self)._callstream(command, **opts)
125 return super(remotefilepeer, self)._callstream(command, **opts)
126
126
127 peer.__class__ = remotefilepeer
127 peer.__class__ = remotefilepeer
128
128
129 class cacheconnection(object):
129 class cacheconnection(object):
130 """The connection for communicating with the remote cache. Performs
130 """The connection for communicating with the remote cache. Performs
131 gets and sets by communicating with an external process that has the
131 gets and sets by communicating with an external process that has the
132 cache-specific implementation.
132 cache-specific implementation.
133 """
133 """
134 def __init__(self):
134 def __init__(self):
135 self.pipeo = self.pipei = self.pipee = None
135 self.pipeo = self.pipei = self.pipee = None
136 self.subprocess = None
136 self.subprocess = None
137 self.connected = False
137 self.connected = False
138
138
139 def connect(self, cachecommand):
139 def connect(self, cachecommand):
140 if self.pipeo:
140 if self.pipeo:
141 raise error.Abort(_("cache connection already open"))
141 raise error.Abort(_("cache connection already open"))
142 self.pipei, self.pipeo, self.pipee, self.subprocess = \
142 self.pipei, self.pipeo, self.pipee, self.subprocess = \
143 procutil.popen4(cachecommand)
143 procutil.popen4(cachecommand)
144 self.connected = True
144 self.connected = True
145
145
146 def close(self):
146 def close(self):
147 def tryclose(pipe):
147 def tryclose(pipe):
148 try:
148 try:
149 pipe.close()
149 pipe.close()
150 except Exception:
150 except Exception:
151 pass
151 pass
152 if self.connected:
152 if self.connected:
153 try:
153 try:
154 self.pipei.write("exit\n")
154 self.pipei.write("exit\n")
155 except Exception:
155 except Exception:
156 pass
156 pass
157 tryclose(self.pipei)
157 tryclose(self.pipei)
158 self.pipei = None
158 self.pipei = None
159 tryclose(self.pipeo)
159 tryclose(self.pipeo)
160 self.pipeo = None
160 self.pipeo = None
161 tryclose(self.pipee)
161 tryclose(self.pipee)
162 self.pipee = None
162 self.pipee = None
163 try:
163 try:
164 # Wait for process to terminate, making sure to avoid deadlock.
164 # Wait for process to terminate, making sure to avoid deadlock.
165 # See https://docs.python.org/2/library/subprocess.html for
165 # See https://docs.python.org/2/library/subprocess.html for
166 # warnings about wait() and deadlocking.
166 # warnings about wait() and deadlocking.
167 self.subprocess.communicate()
167 self.subprocess.communicate()
168 except Exception:
168 except Exception:
169 pass
169 pass
170 self.subprocess = None
170 self.subprocess = None
171 self.connected = False
171 self.connected = False
172
172
173 def request(self, request, flush=True):
173 def request(self, request, flush=True):
174 if self.connected:
174 if self.connected:
175 try:
175 try:
176 self.pipei.write(request)
176 self.pipei.write(request)
177 if flush:
177 if flush:
178 self.pipei.flush()
178 self.pipei.flush()
179 except IOError:
179 except IOError:
180 self.close()
180 self.close()
181
181
182 def receiveline(self):
182 def receiveline(self):
183 if not self.connected:
183 if not self.connected:
184 return None
184 return None
185 try:
185 try:
186 result = self.pipeo.readline()[:-1]
186 result = self.pipeo.readline()[:-1]
187 if not result:
187 if not result:
188 self.close()
188 self.close()
189 except IOError:
189 except IOError:
190 self.close()
190 self.close()
191
191
192 return result
192 return result
193
193
194 def _getfilesbatch(
194 def _getfilesbatch(
195 remote, receivemissing, progresstick, missed, idmap, batchsize):
195 remote, receivemissing, progresstick, missed, idmap, batchsize):
196 # Over http(s), iterbatch is a streamy method and we can start
196 # Over http(s), iterbatch is a streamy method and we can start
197 # looking at results early. This means we send one (potentially
197 # looking at results early. This means we send one (potentially
198 # large) request, but then we show nice progress as we process
198 # large) request, but then we show nice progress as we process
199 # file results, rather than showing chunks of $batchsize in
199 # file results, rather than showing chunks of $batchsize in
200 # progress.
200 # progress.
201 #
201 #
202 # Over ssh, iterbatch isn't streamy because batch() wasn't
202 # Over ssh, iterbatch isn't streamy because batch() wasn't
203 # explicitly designed as a streaming method. In the future we
203 # explicitly designed as a streaming method. In the future we
204 # should probably introduce a streambatch() method upstream and
204 # should probably introduce a streambatch() method upstream and
205 # use that for this.
205 # use that for this.
206 with remote.commandexecutor() as e:
206 with remote.commandexecutor() as e:
207 futures = []
207 futures = []
208 for m in missed:
208 for m in missed:
209 futures.append(e.callcommand('x_rfl_getfile', {
209 futures.append(e.callcommand('x_rfl_getfile', {
210 'file': idmap[m],
210 'file': idmap[m],
211 'node': m[-40:]
211 'node': m[-40:]
212 }))
212 }))
213
213
214 for i, m in enumerate(missed):
214 for i, m in enumerate(missed):
215 r = futures[i].result()
215 r = futures[i].result()
216 futures[i] = None # release memory
216 futures[i] = None # release memory
217 file_ = idmap[m]
217 file_ = idmap[m]
218 node = m[-40:]
218 node = m[-40:]
219 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
219 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
220 progresstick()
220 progresstick()
221
221
222 def _getfiles_optimistic(
222 def _getfiles_optimistic(
223 remote, receivemissing, progresstick, missed, idmap, step):
223 remote, receivemissing, progresstick, missed, idmap, step):
224 remote._callstream("x_rfl_getfiles")
224 remote._callstream("x_rfl_getfiles")
225 i = 0
225 i = 0
226 pipeo = remote._pipeo
226 pipeo = remote._pipeo
227 pipei = remote._pipei
227 pipei = remote._pipei
228 while i < len(missed):
228 while i < len(missed):
229 # issue a batch of requests
229 # issue a batch of requests
230 start = i
230 start = i
231 end = min(len(missed), start + step)
231 end = min(len(missed), start + step)
232 i = end
232 i = end
233 for missingid in missed[start:end]:
233 for missingid in missed[start:end]:
234 # issue new request
234 # issue new request
235 versionid = missingid[-40:]
235 versionid = missingid[-40:]
236 file = idmap[missingid]
236 file = idmap[missingid]
237 sshrequest = "%s%s\n" % (versionid, file)
237 sshrequest = "%s%s\n" % (versionid, file)
238 pipeo.write(sshrequest)
238 pipeo.write(sshrequest)
239 pipeo.flush()
239 pipeo.flush()
240
240
241 # receive batch results
241 # receive batch results
242 for missingid in missed[start:end]:
242 for missingid in missed[start:end]:
243 versionid = missingid[-40:]
243 versionid = missingid[-40:]
244 file = idmap[missingid]
244 file = idmap[missingid]
245 receivemissing(pipei, file, versionid)
245 receivemissing(pipei, file, versionid)
246 progresstick()
246 progresstick()
247
247
248 # End the command
248 # End the command
249 pipeo.write('\n')
249 pipeo.write('\n')
250 pipeo.flush()
250 pipeo.flush()
251
251
252 def _getfiles_threaded(
252 def _getfiles_threaded(
253 remote, receivemissing, progresstick, missed, idmap, step):
253 remote, receivemissing, progresstick, missed, idmap, step):
254 remote._callstream("getfiles")
254 remote._callstream("getfiles")
255 pipeo = remote._pipeo
255 pipeo = remote._pipeo
256 pipei = remote._pipei
256 pipei = remote._pipei
257
257
258 def writer():
258 def writer():
259 for missingid in missed:
259 for missingid in missed:
260 versionid = missingid[-40:]
260 versionid = missingid[-40:]
261 file = idmap[missingid]
261 file = idmap[missingid]
262 sshrequest = "%s%s\n" % (versionid, file)
262 sshrequest = "%s%s\n" % (versionid, file)
263 pipeo.write(sshrequest)
263 pipeo.write(sshrequest)
264 pipeo.flush()
264 pipeo.flush()
265 writerthread = threading.Thread(target=writer)
265 writerthread = threading.Thread(target=writer)
266 writerthread.daemon = True
266 writerthread.daemon = True
267 writerthread.start()
267 writerthread.start()
268
268
269 for missingid in missed:
269 for missingid in missed:
270 versionid = missingid[-40:]
270 versionid = missingid[-40:]
271 file = idmap[missingid]
271 file = idmap[missingid]
272 receivemissing(pipei, file, versionid)
272 receivemissing(pipei, file, versionid)
273 progresstick()
273 progresstick()
274
274
275 writerthread.join()
275 writerthread.join()
276 # End the command
276 # End the command
277 pipeo.write('\n')
277 pipeo.write('\n')
278 pipeo.flush()
278 pipeo.flush()
279
279
280 class fileserverclient(object):
280 class fileserverclient(object):
281 """A client for requesting files from the remote file server.
281 """A client for requesting files from the remote file server.
282 """
282 """
283 def __init__(self, repo):
283 def __init__(self, repo):
284 ui = repo.ui
284 ui = repo.ui
285 self.repo = repo
285 self.repo = repo
286 self.ui = ui
286 self.ui = ui
287 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
287 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
288 if self.cacheprocess:
288 if self.cacheprocess:
289 self.cacheprocess = util.expandpath(self.cacheprocess)
289 self.cacheprocess = util.expandpath(self.cacheprocess)
290
290
291 # This option causes remotefilelog to pass the full file path to the
291 # This option causes remotefilelog to pass the full file path to the
292 # cacheprocess instead of a hashed key.
292 # cacheprocess instead of a hashed key.
293 self.cacheprocesspasspath = ui.configbool(
293 self.cacheprocesspasspath = ui.configbool(
294 "remotefilelog", "cacheprocess.includepath")
294 "remotefilelog", "cacheprocess.includepath")
295
295
296 self.debugoutput = ui.configbool("remotefilelog", "debug")
296 self.debugoutput = ui.configbool("remotefilelog", "debug")
297
297
298 self.remotecache = cacheconnection()
298 self.remotecache = cacheconnection()
299
299
300 def setstore(self, datastore, historystore, writedata, writehistory):
300 def setstore(self, datastore, historystore, writedata, writehistory):
301 self.datastore = datastore
301 self.datastore = datastore
302 self.historystore = historystore
302 self.historystore = historystore
303 self.writedata = writedata
303 self.writedata = writedata
304 self.writehistory = writehistory
304 self.writehistory = writehistory
305
305
306 def _connect(self):
306 def _connect(self):
307 return self.repo.connectionpool.get(self.repo.fallbackpath)
307 return self.repo.connectionpool.get(self.repo.fallbackpath)
308
308
309 def request(self, fileids):
309 def request(self, fileids):
310 """Takes a list of filename/node pairs and fetches them from the
310 """Takes a list of filename/node pairs and fetches them from the
311 server. Files are stored in the local cache.
311 server. Files are stored in the local cache.
312 A list of nodes that the server couldn't find is returned.
312 A list of nodes that the server couldn't find is returned.
313 If the connection fails, an exception is raised.
313 If the connection fails, an exception is raised.
314 """
314 """
315 if not self.remotecache.connected:
315 if not self.remotecache.connected:
316 self.connect()
316 self.connect()
317 cache = self.remotecache
317 cache = self.remotecache
318 writedata = self.writedata
318 writedata = self.writedata
319
319
320 repo = self.repo
320 repo = self.repo
321 count = len(fileids)
321 count = len(fileids)
322 request = "get\n%d\n" % count
322 request = "get\n%d\n" % count
323 idmap = {}
323 idmap = {}
324 reponame = repo.name
324 reponame = repo.name
325 for file, id in fileids:
325 for file, id in fileids:
326 fullid = getcachekey(reponame, file, id)
326 fullid = getcachekey(reponame, file, id)
327 if self.cacheprocesspasspath:
327 if self.cacheprocesspasspath:
328 request += file + '\0'
328 request += file + '\0'
329 request += fullid + "\n"
329 request += fullid + "\n"
330 idmap[fullid] = file
330 idmap[fullid] = file
331
331
332 cache.request(request)
332 cache.request(request)
333
333
334 total = count
334 total = count
335 self.ui.progress(_downloading, 0, total=count)
335 self.ui.progress(_downloading, 0, total=count)
336
336
337 missed = []
337 missed = []
338 count = 0
338 count = 0
339 while True:
339 while True:
340 missingid = cache.receiveline()
340 missingid = cache.receiveline()
341 if not missingid:
341 if not missingid:
342 missedset = set(missed)
342 missedset = set(missed)
343 for missingid in idmap.iterkeys():
343 for missingid in idmap:
344 if not missingid in missedset:
344 if not missingid in missedset:
345 missed.append(missingid)
345 missed.append(missingid)
346 self.ui.warn(_("warning: cache connection closed early - " +
346 self.ui.warn(_("warning: cache connection closed early - " +
347 "falling back to server\n"))
347 "falling back to server\n"))
348 break
348 break
349 if missingid == "0":
349 if missingid == "0":
350 break
350 break
351 if missingid.startswith("_hits_"):
351 if missingid.startswith("_hits_"):
352 # receive progress reports
352 # receive progress reports
353 parts = missingid.split("_")
353 parts = missingid.split("_")
354 count += int(parts[2])
354 count += int(parts[2])
355 self.ui.progress(_downloading, count, total=total)
355 self.ui.progress(_downloading, count, total=total)
356 continue
356 continue
357
357
358 missed.append(missingid)
358 missed.append(missingid)
359
359
360 global fetchmisses
360 global fetchmisses
361 fetchmisses += len(missed)
361 fetchmisses += len(missed)
362
362
363 count = [total - len(missed)]
363 count = [total - len(missed)]
364 fromcache = count[0]
364 fromcache = count[0]
365 self.ui.progress(_downloading, count[0], total=total)
365 self.ui.progress(_downloading, count[0], total=total)
366 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
366 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
367 count[0], total, hit=count[0], total=total)
367 count[0], total, hit=count[0], total=total)
368
368
369 oldumask = os.umask(0o002)
369 oldumask = os.umask(0o002)
370 try:
370 try:
371 # receive cache misses from master
371 # receive cache misses from master
372 if missed:
372 if missed:
373 def progresstick():
373 def progresstick():
374 count[0] += 1
374 count[0] += 1
375 self.ui.progress(_downloading, count[0], total=total)
375 self.ui.progress(_downloading, count[0], total=total)
376 # When verbose is true, sshpeer prints 'running ssh...'
376 # When verbose is true, sshpeer prints 'running ssh...'
377 # to stdout, which can interfere with some command
377 # to stdout, which can interfere with some command
378 # outputs
378 # outputs
379 verbose = self.ui.verbose
379 verbose = self.ui.verbose
380 self.ui.verbose = False
380 self.ui.verbose = False
381 try:
381 try:
382 with self._connect() as conn:
382 with self._connect() as conn:
383 remote = conn.peer
383 remote = conn.peer
384 if remote.capable(
384 if remote.capable(
385 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
385 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
386 if not isinstance(remote, _sshv1peer):
386 if not isinstance(remote, _sshv1peer):
387 raise error.Abort('remotefilelog requires ssh '
387 raise error.Abort('remotefilelog requires ssh '
388 'servers')
388 'servers')
389 step = self.ui.configint('remotefilelog',
389 step = self.ui.configint('remotefilelog',
390 'getfilesstep')
390 'getfilesstep')
391 getfilestype = self.ui.config('remotefilelog',
391 getfilestype = self.ui.config('remotefilelog',
392 'getfilestype')
392 'getfilestype')
393 if getfilestype == 'threaded':
393 if getfilestype == 'threaded':
394 _getfiles = _getfiles_threaded
394 _getfiles = _getfiles_threaded
395 else:
395 else:
396 _getfiles = _getfiles_optimistic
396 _getfiles = _getfiles_optimistic
397 _getfiles(remote, self.receivemissing, progresstick,
397 _getfiles(remote, self.receivemissing, progresstick,
398 missed, idmap, step)
398 missed, idmap, step)
399 elif remote.capable("x_rfl_getfile"):
399 elif remote.capable("x_rfl_getfile"):
400 if remote.capable('batch'):
400 if remote.capable('batch'):
401 batchdefault = 100
401 batchdefault = 100
402 else:
402 else:
403 batchdefault = 10
403 batchdefault = 10
404 batchsize = self.ui.configint(
404 batchsize = self.ui.configint(
405 'remotefilelog', 'batchsize', batchdefault)
405 'remotefilelog', 'batchsize', batchdefault)
406 _getfilesbatch(
406 _getfilesbatch(
407 remote, self.receivemissing, progresstick,
407 remote, self.receivemissing, progresstick,
408 missed, idmap, batchsize)
408 missed, idmap, batchsize)
409 else:
409 else:
410 raise error.Abort("configured remotefilelog server"
410 raise error.Abort("configured remotefilelog server"
411 " does not support remotefilelog")
411 " does not support remotefilelog")
412
412
413 self.ui.log("remotefilefetchlog",
413 self.ui.log("remotefilefetchlog",
414 "Success\n",
414 "Success\n",
415 fetched_files = count[0] - fromcache,
415 fetched_files = count[0] - fromcache,
416 total_to_fetch = total - fromcache)
416 total_to_fetch = total - fromcache)
417 except Exception:
417 except Exception:
418 self.ui.log("remotefilefetchlog",
418 self.ui.log("remotefilefetchlog",
419 "Fail\n",
419 "Fail\n",
420 fetched_files = count[0] - fromcache,
420 fetched_files = count[0] - fromcache,
421 total_to_fetch = total - fromcache)
421 total_to_fetch = total - fromcache)
422 raise
422 raise
423 finally:
423 finally:
424 self.ui.verbose = verbose
424 self.ui.verbose = verbose
425 # send to memcache
425 # send to memcache
426 count[0] = len(missed)
426 count[0] = len(missed)
427 request = "set\n%d\n%s\n" % (count[0], "\n".join(missed))
427 request = "set\n%d\n%s\n" % (count[0], "\n".join(missed))
428 cache.request(request)
428 cache.request(request)
429
429
430 self.ui.progress(_downloading, None)
430 self.ui.progress(_downloading, None)
431
431
432 # mark ourselves as a user of this cache
432 # mark ourselves as a user of this cache
433 writedata.markrepo(self.repo.path)
433 writedata.markrepo(self.repo.path)
434 finally:
434 finally:
435 os.umask(oldumask)
435 os.umask(oldumask)
436
436
437 def receivemissing(self, pipe, filename, node):
437 def receivemissing(self, pipe, filename, node):
438 line = pipe.readline()[:-1]
438 line = pipe.readline()[:-1]
439 if not line:
439 if not line:
440 raise error.ResponseError(_("error downloading file contents:"),
440 raise error.ResponseError(_("error downloading file contents:"),
441 _("connection closed early"))
441 _("connection closed early"))
442 size = int(line)
442 size = int(line)
443 data = pipe.read(size)
443 data = pipe.read(size)
444 if len(data) != size:
444 if len(data) != size:
445 raise error.ResponseError(_("error downloading file contents:"),
445 raise error.ResponseError(_("error downloading file contents:"),
446 _("only received %s of %s bytes")
446 _("only received %s of %s bytes")
447 % (len(data), size))
447 % (len(data), size))
448
448
449 self.writedata.addremotefilelognode(filename, bin(node),
449 self.writedata.addremotefilelognode(filename, bin(node),
450 zlib.decompress(data))
450 zlib.decompress(data))
451
451
452 def connect(self):
452 def connect(self):
453 if self.cacheprocess:
453 if self.cacheprocess:
454 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
454 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
455 self.remotecache.connect(cmd)
455 self.remotecache.connect(cmd)
456 else:
456 else:
457 # If no cache process is specified, we fake one that always
457 # If no cache process is specified, we fake one that always
458 # returns cache misses. This enables tests to run easily
458 # returns cache misses. This enables tests to run easily
459 # and may eventually allow us to be a drop in replacement
459 # and may eventually allow us to be a drop in replacement
460 # for the largefiles extension.
460 # for the largefiles extension.
461 class simplecache(object):
461 class simplecache(object):
462 def __init__(self):
462 def __init__(self):
463 self.missingids = []
463 self.missingids = []
464 self.connected = True
464 self.connected = True
465
465
466 def close(self):
466 def close(self):
467 pass
467 pass
468
468
469 def request(self, value, flush=True):
469 def request(self, value, flush=True):
470 lines = value.split("\n")
470 lines = value.split("\n")
471 if lines[0] != "get":
471 if lines[0] != "get":
472 return
472 return
473 self.missingids = lines[2:-1]
473 self.missingids = lines[2:-1]
474 self.missingids.append('0')
474 self.missingids.append('0')
475
475
476 def receiveline(self):
476 def receiveline(self):
477 if len(self.missingids) > 0:
477 if len(self.missingids) > 0:
478 return self.missingids.pop(0)
478 return self.missingids.pop(0)
479 return None
479 return None
480
480
481 self.remotecache = simplecache()
481 self.remotecache = simplecache()
482
482
483 def close(self):
483 def close(self):
484 if fetches:
484 if fetches:
485 msg = ("%s files fetched over %d fetches - " +
485 msg = ("%s files fetched over %d fetches - " +
486 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
486 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
487 fetched,
487 fetched,
488 fetches,
488 fetches,
489 fetchmisses,
489 fetchmisses,
490 float(fetched - fetchmisses) / float(fetched) * 100.0,
490 float(fetched - fetchmisses) / float(fetched) * 100.0,
491 fetchcost)
491 fetchcost)
492 if self.debugoutput:
492 if self.debugoutput:
493 self.ui.warn(msg)
493 self.ui.warn(msg)
494 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
494 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
495 remotefilelogfetched=fetched,
495 remotefilelogfetched=fetched,
496 remotefilelogfetches=fetches,
496 remotefilelogfetches=fetches,
497 remotefilelogfetchmisses=fetchmisses,
497 remotefilelogfetchmisses=fetchmisses,
498 remotefilelogfetchtime=fetchcost * 1000)
498 remotefilelogfetchtime=fetchcost * 1000)
499
499
500 if self.remotecache.connected:
500 if self.remotecache.connected:
501 self.remotecache.close()
501 self.remotecache.close()
502
502
503 def prefetch(self, fileids, force=False, fetchdata=True,
503 def prefetch(self, fileids, force=False, fetchdata=True,
504 fetchhistory=False):
504 fetchhistory=False):
505 """downloads the given file versions to the cache
505 """downloads the given file versions to the cache
506 """
506 """
507 repo = self.repo
507 repo = self.repo
508 idstocheck = []
508 idstocheck = []
509 for file, id in fileids:
509 for file, id in fileids:
510 # hack
510 # hack
511 # - we don't use .hgtags
511 # - we don't use .hgtags
512 # - workingctx produces ids with length 42,
512 # - workingctx produces ids with length 42,
513 # which we skip since they aren't in any cache
513 # which we skip since they aren't in any cache
514 if (file == '.hgtags' or len(id) == 42
514 if (file == '.hgtags' or len(id) == 42
515 or not repo.shallowmatch(file)):
515 or not repo.shallowmatch(file)):
516 continue
516 continue
517
517
518 idstocheck.append((file, bin(id)))
518 idstocheck.append((file, bin(id)))
519
519
520 datastore = self.datastore
520 datastore = self.datastore
521 historystore = self.historystore
521 historystore = self.historystore
522 if force:
522 if force:
523 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
523 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
524 historystore = metadatastore.unionmetadatastore(
524 historystore = metadatastore.unionmetadatastore(
525 *repo.sharedhistorystores)
525 *repo.sharedhistorystores)
526
526
527 missingids = set()
527 missingids = set()
528 if fetchdata:
528 if fetchdata:
529 missingids.update(datastore.getmissing(idstocheck))
529 missingids.update(datastore.getmissing(idstocheck))
530 if fetchhistory:
530 if fetchhistory:
531 missingids.update(historystore.getmissing(idstocheck))
531 missingids.update(historystore.getmissing(idstocheck))
532
532
533 # partition missing nodes into nullid and not-nullid so we can
533 # partition missing nodes into nullid and not-nullid so we can
534 # warn about this filtering potentially shadowing bugs.
534 # warn about this filtering potentially shadowing bugs.
535 nullids = len([None for unused, id in missingids if id == nullid])
535 nullids = len([None for unused, id in missingids if id == nullid])
536 if nullids:
536 if nullids:
537 missingids = [(f, id) for f, id in missingids if id != nullid]
537 missingids = [(f, id) for f, id in missingids if id != nullid]
538 repo.ui.develwarn(
538 repo.ui.develwarn(
539 ('remotefilelog not fetching %d null revs'
539 ('remotefilelog not fetching %d null revs'
540 ' - this is likely hiding bugs' % nullids),
540 ' - this is likely hiding bugs' % nullids),
541 config='remotefilelog-ext')
541 config='remotefilelog-ext')
542 if missingids:
542 if missingids:
543 global fetches, fetched, fetchcost
543 global fetches, fetched, fetchcost
544 fetches += 1
544 fetches += 1
545
545
546 # We want to be able to detect excess individual file downloads, so
546 # We want to be able to detect excess individual file downloads, so
547 # let's log that information for debugging.
547 # let's log that information for debugging.
548 if fetches >= 15 and fetches < 18:
548 if fetches >= 15 and fetches < 18:
549 if fetches == 15:
549 if fetches == 15:
550 fetchwarning = self.ui.config('remotefilelog',
550 fetchwarning = self.ui.config('remotefilelog',
551 'fetchwarning')
551 'fetchwarning')
552 if fetchwarning:
552 if fetchwarning:
553 self.ui.warn(fetchwarning + '\n')
553 self.ui.warn(fetchwarning + '\n')
554 self.logstacktrace()
554 self.logstacktrace()
555 missingids = [(file, hex(id)) for file, id in missingids]
555 missingids = [(file, hex(id)) for file, id in missingids]
556 fetched += len(missingids)
556 fetched += len(missingids)
557 start = time.time()
557 start = time.time()
558 missingids = self.request(missingids)
558 missingids = self.request(missingids)
559 if missingids:
559 if missingids:
560 raise error.Abort(_("unable to download %d files") %
560 raise error.Abort(_("unable to download %d files") %
561 len(missingids))
561 len(missingids))
562 fetchcost += time.time() - start
562 fetchcost += time.time() - start
563 self._lfsprefetch(fileids)
563 self._lfsprefetch(fileids)
564
564
565 def _lfsprefetch(self, fileids):
565 def _lfsprefetch(self, fileids):
566 if not _lfsmod or not util.safehasattr(
566 if not _lfsmod or not util.safehasattr(
567 self.repo.svfs, 'lfslocalblobstore'):
567 self.repo.svfs, 'lfslocalblobstore'):
568 return
568 return
569 if not _lfsmod.wrapper.candownload(self.repo):
569 if not _lfsmod.wrapper.candownload(self.repo):
570 return
570 return
571 pointers = []
571 pointers = []
572 store = self.repo.svfs.lfslocalblobstore
572 store = self.repo.svfs.lfslocalblobstore
573 for file, id in fileids:
573 for file, id in fileids:
574 node = bin(id)
574 node = bin(id)
575 rlog = self.repo.file(file)
575 rlog = self.repo.file(file)
576 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
576 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
577 text = rlog.revision(node, raw=True)
577 text = rlog.revision(node, raw=True)
578 p = _lfsmod.pointer.deserialize(text)
578 p = _lfsmod.pointer.deserialize(text)
579 oid = p.oid()
579 oid = p.oid()
580 if not store.has(oid):
580 if not store.has(oid):
581 pointers.append(p)
581 pointers.append(p)
582 if len(pointers) > 0:
582 if len(pointers) > 0:
583 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
583 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
584 assert all(store.has(p.oid()) for p in pointers)
584 assert all(store.has(p.oid()) for p in pointers)
585
585
586 def logstacktrace(self):
586 def logstacktrace(self):
587 import traceback
587 import traceback
588 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
588 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
589 ''.join(traceback.format_stack()))
589 ''.join(traceback.format_stack()))
@@ -1,452 +1,452 b''
1 # remotefilelog.py - filelog implementation where filelog history is stored
1 # remotefilelog.py - filelog implementation where filelog history is stored
2 # remotely
2 # remotely
3 #
3 #
4 # Copyright 2013 Facebook, Inc.
4 # Copyright 2013 Facebook, Inc.
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import collections
10 import collections
11 import os
11 import os
12
12
13 from mercurial.node import bin, nullid
13 from mercurial.node import bin, nullid
14 from mercurial.i18n import _
14 from mercurial.i18n import _
15 from mercurial import (
15 from mercurial import (
16 ancestor,
16 ancestor,
17 error,
17 error,
18 mdiff,
18 mdiff,
19 revlog,
19 revlog,
20 )
20 )
21 from mercurial.utils import storageutil
21 from mercurial.utils import storageutil
22
22
23 from . import (
23 from . import (
24 constants,
24 constants,
25 fileserverclient,
25 fileserverclient,
26 shallowutil,
26 shallowutil,
27 )
27 )
28
28
29 class remotefilelognodemap(object):
29 class remotefilelognodemap(object):
30 def __init__(self, filename, store):
30 def __init__(self, filename, store):
31 self._filename = filename
31 self._filename = filename
32 self._store = store
32 self._store = store
33
33
34 def __contains__(self, node):
34 def __contains__(self, node):
35 missing = self._store.getmissing([(self._filename, node)])
35 missing = self._store.getmissing([(self._filename, node)])
36 return not bool(missing)
36 return not bool(missing)
37
37
38 def __get__(self, node):
38 def __get__(self, node):
39 if node not in self:
39 if node not in self:
40 raise KeyError(node)
40 raise KeyError(node)
41 return node
41 return node
42
42
43 class remotefilelog(object):
43 class remotefilelog(object):
44
44
45 _generaldelta = True
45 _generaldelta = True
46
46
47 def __init__(self, opener, path, repo):
47 def __init__(self, opener, path, repo):
48 self.opener = opener
48 self.opener = opener
49 self.filename = path
49 self.filename = path
50 self.repo = repo
50 self.repo = repo
51 self.nodemap = remotefilelognodemap(self.filename, repo.contentstore)
51 self.nodemap = remotefilelognodemap(self.filename, repo.contentstore)
52
52
53 self.version = 1
53 self.version = 1
54
54
55 def read(self, node):
55 def read(self, node):
56 """returns the file contents at this node"""
56 """returns the file contents at this node"""
57 t = self.revision(node)
57 t = self.revision(node)
58 if not t.startswith('\1\n'):
58 if not t.startswith('\1\n'):
59 return t
59 return t
60 s = t.index('\1\n', 2)
60 s = t.index('\1\n', 2)
61 return t[s + 2:]
61 return t[s + 2:]
62
62
63 def add(self, text, meta, transaction, linknode, p1=None, p2=None):
63 def add(self, text, meta, transaction, linknode, p1=None, p2=None):
64 hashtext = text
64 hashtext = text
65
65
66 # hash with the metadata, like in vanilla filelogs
66 # hash with the metadata, like in vanilla filelogs
67 hashtext = shallowutil.createrevlogtext(text, meta.get('copy'),
67 hashtext = shallowutil.createrevlogtext(text, meta.get('copy'),
68 meta.get('copyrev'))
68 meta.get('copyrev'))
69 node = storageutil.hashrevisionsha1(hashtext, p1, p2)
69 node = storageutil.hashrevisionsha1(hashtext, p1, p2)
70 return self.addrevision(hashtext, transaction, linknode, p1, p2,
70 return self.addrevision(hashtext, transaction, linknode, p1, p2,
71 node=node)
71 node=node)
72
72
73 def _createfileblob(self, text, meta, flags, p1, p2, node, linknode):
73 def _createfileblob(self, text, meta, flags, p1, p2, node, linknode):
74 # text passed to "_createfileblob" does not include filelog metadata
74 # text passed to "_createfileblob" does not include filelog metadata
75 header = shallowutil.buildfileblobheader(len(text), flags)
75 header = shallowutil.buildfileblobheader(len(text), flags)
76 data = "%s\0%s" % (header, text)
76 data = "%s\0%s" % (header, text)
77
77
78 realp1 = p1
78 realp1 = p1
79 copyfrom = ""
79 copyfrom = ""
80 if meta and 'copy' in meta:
80 if meta and 'copy' in meta:
81 copyfrom = meta['copy']
81 copyfrom = meta['copy']
82 realp1 = bin(meta['copyrev'])
82 realp1 = bin(meta['copyrev'])
83
83
84 data += "%s%s%s%s%s\0" % (node, realp1, p2, linknode, copyfrom)
84 data += "%s%s%s%s%s\0" % (node, realp1, p2, linknode, copyfrom)
85
85
86 visited = set()
86 visited = set()
87
87
88 pancestors = {}
88 pancestors = {}
89 queue = []
89 queue = []
90 if realp1 != nullid:
90 if realp1 != nullid:
91 p1flog = self
91 p1flog = self
92 if copyfrom:
92 if copyfrom:
93 p1flog = remotefilelog(self.opener, copyfrom, self.repo)
93 p1flog = remotefilelog(self.opener, copyfrom, self.repo)
94
94
95 pancestors.update(p1flog.ancestormap(realp1))
95 pancestors.update(p1flog.ancestormap(realp1))
96 queue.append(realp1)
96 queue.append(realp1)
97 visited.add(realp1)
97 visited.add(realp1)
98 if p2 != nullid:
98 if p2 != nullid:
99 pancestors.update(self.ancestormap(p2))
99 pancestors.update(self.ancestormap(p2))
100 queue.append(p2)
100 queue.append(p2)
101 visited.add(p2)
101 visited.add(p2)
102
102
103 ancestortext = ""
103 ancestortext = ""
104
104
105 # add the ancestors in topological order
105 # add the ancestors in topological order
106 while queue:
106 while queue:
107 c = queue.pop(0)
107 c = queue.pop(0)
108 pa1, pa2, ancestorlinknode, pacopyfrom = pancestors[c]
108 pa1, pa2, ancestorlinknode, pacopyfrom = pancestors[c]
109
109
110 pacopyfrom = pacopyfrom or ''
110 pacopyfrom = pacopyfrom or ''
111 ancestortext += "%s%s%s%s%s\0" % (
111 ancestortext += "%s%s%s%s%s\0" % (
112 c, pa1, pa2, ancestorlinknode, pacopyfrom)
112 c, pa1, pa2, ancestorlinknode, pacopyfrom)
113
113
114 if pa1 != nullid and pa1 not in visited:
114 if pa1 != nullid and pa1 not in visited:
115 queue.append(pa1)
115 queue.append(pa1)
116 visited.add(pa1)
116 visited.add(pa1)
117 if pa2 != nullid and pa2 not in visited:
117 if pa2 != nullid and pa2 not in visited:
118 queue.append(pa2)
118 queue.append(pa2)
119 visited.add(pa2)
119 visited.add(pa2)
120
120
121 data += ancestortext
121 data += ancestortext
122
122
123 return data
123 return data
124
124
125 def addrevision(self, text, transaction, linknode, p1, p2, cachedelta=None,
125 def addrevision(self, text, transaction, linknode, p1, p2, cachedelta=None,
126 node=None, flags=revlog.REVIDX_DEFAULT_FLAGS):
126 node=None, flags=revlog.REVIDX_DEFAULT_FLAGS):
127 # text passed to "addrevision" includes hg filelog metadata header
127 # text passed to "addrevision" includes hg filelog metadata header
128 if node is None:
128 if node is None:
129 node = storageutil.hashrevisionsha1(text, p1, p2)
129 node = storageutil.hashrevisionsha1(text, p1, p2)
130
130
131 meta, metaoffset = storageutil.parsemeta(text)
131 meta, metaoffset = storageutil.parsemeta(text)
132 rawtext, validatehash = self._processflags(text, flags, 'write')
132 rawtext, validatehash = self._processflags(text, flags, 'write')
133 return self.addrawrevision(rawtext, transaction, linknode, p1, p2,
133 return self.addrawrevision(rawtext, transaction, linknode, p1, p2,
134 node, flags, cachedelta,
134 node, flags, cachedelta,
135 _metatuple=(meta, metaoffset))
135 _metatuple=(meta, metaoffset))
136
136
137 def addrawrevision(self, rawtext, transaction, linknode, p1, p2, node,
137 def addrawrevision(self, rawtext, transaction, linknode, p1, p2, node,
138 flags, cachedelta=None, _metatuple=None):
138 flags, cachedelta=None, _metatuple=None):
139 if _metatuple:
139 if _metatuple:
140 # _metatuple: used by "addrevision" internally by remotefilelog
140 # _metatuple: used by "addrevision" internally by remotefilelog
141 # meta was parsed confidently
141 # meta was parsed confidently
142 meta, metaoffset = _metatuple
142 meta, metaoffset = _metatuple
143 else:
143 else:
144 # not from self.addrevision, but something else (repo._filecommit)
144 # not from self.addrevision, but something else (repo._filecommit)
145 # calls addrawrevision directly. remotefilelog needs to get and
145 # calls addrawrevision directly. remotefilelog needs to get and
146 # strip filelog metadata.
146 # strip filelog metadata.
147 # we don't have confidence about whether rawtext contains filelog
147 # we don't have confidence about whether rawtext contains filelog
148 # metadata or not (flag processor could replace it), so we just
148 # metadata or not (flag processor could replace it), so we just
149 # parse it as best-effort.
149 # parse it as best-effort.
150 # in LFS (flags != 0)'s case, the best way is to call LFS code to
150 # in LFS (flags != 0)'s case, the best way is to call LFS code to
151 # get the meta information, instead of storageutil.parsemeta.
151 # get the meta information, instead of storageutil.parsemeta.
152 meta, metaoffset = storageutil.parsemeta(rawtext)
152 meta, metaoffset = storageutil.parsemeta(rawtext)
153 if flags != 0:
153 if flags != 0:
154 # when flags != 0, be conservative and do not mangle rawtext, since
154 # when flags != 0, be conservative and do not mangle rawtext, since
155 # a read flag processor expects the text not being mangled at all.
155 # a read flag processor expects the text not being mangled at all.
156 metaoffset = 0
156 metaoffset = 0
157 if metaoffset:
157 if metaoffset:
158 # remotefilelog fileblob stores copy metadata in its ancestortext,
158 # remotefilelog fileblob stores copy metadata in its ancestortext,
159 # not its main blob. so we need to remove filelog metadata
159 # not its main blob. so we need to remove filelog metadata
160 # (containing copy information) from text.
160 # (containing copy information) from text.
161 blobtext = rawtext[metaoffset:]
161 blobtext = rawtext[metaoffset:]
162 else:
162 else:
163 blobtext = rawtext
163 blobtext = rawtext
164 data = self._createfileblob(blobtext, meta, flags, p1, p2, node,
164 data = self._createfileblob(blobtext, meta, flags, p1, p2, node,
165 linknode)
165 linknode)
166 self.repo.contentstore.addremotefilelognode(self.filename, node, data)
166 self.repo.contentstore.addremotefilelognode(self.filename, node, data)
167
167
168 return node
168 return node
169
169
170 def renamed(self, node):
170 def renamed(self, node):
171 ancestors = self.repo.metadatastore.getancestors(self.filename, node)
171 ancestors = self.repo.metadatastore.getancestors(self.filename, node)
172 p1, p2, linknode, copyfrom = ancestors[node]
172 p1, p2, linknode, copyfrom = ancestors[node]
173 if copyfrom:
173 if copyfrom:
174 return (copyfrom, p1)
174 return (copyfrom, p1)
175
175
176 return False
176 return False
177
177
178 def size(self, node):
178 def size(self, node):
179 """return the size of a given revision"""
179 """return the size of a given revision"""
180 return len(self.read(node))
180 return len(self.read(node))
181
181
182 rawsize = size
182 rawsize = size
183
183
184 def cmp(self, node, text):
184 def cmp(self, node, text):
185 """compare text with a given file revision
185 """compare text with a given file revision
186
186
187 returns True if text is different than what is stored.
187 returns True if text is different than what is stored.
188 """
188 """
189
189
190 if node == nullid:
190 if node == nullid:
191 return True
191 return True
192
192
193 nodetext = self.read(node)
193 nodetext = self.read(node)
194 return nodetext != text
194 return nodetext != text
195
195
196 def __nonzero__(self):
196 def __nonzero__(self):
197 return True
197 return True
198
198
199 def __len__(self):
199 def __len__(self):
200 if self.filename == '.hgtags':
200 if self.filename == '.hgtags':
201 # The length of .hgtags is used to fast path tag checking.
201 # The length of .hgtags is used to fast path tag checking.
202 # remotefilelog doesn't support .hgtags since the entire .hgtags
202 # remotefilelog doesn't support .hgtags since the entire .hgtags
203 # history is needed. Use the excludepattern setting to make
203 # history is needed. Use the excludepattern setting to make
204 # .hgtags a normal filelog.
204 # .hgtags a normal filelog.
205 return 0
205 return 0
206
206
207 raise RuntimeError("len not supported")
207 raise RuntimeError("len not supported")
208
208
209 def empty(self):
209 def empty(self):
210 return False
210 return False
211
211
212 def flags(self, node):
212 def flags(self, node):
213 if isinstance(node, int):
213 if isinstance(node, int):
214 raise error.ProgrammingError(
214 raise error.ProgrammingError(
215 'remotefilelog does not accept integer rev for flags')
215 'remotefilelog does not accept integer rev for flags')
216 store = self.repo.contentstore
216 store = self.repo.contentstore
217 return store.getmeta(self.filename, node).get(constants.METAKEYFLAG, 0)
217 return store.getmeta(self.filename, node).get(constants.METAKEYFLAG, 0)
218
218
219 def parents(self, node):
219 def parents(self, node):
220 if node == nullid:
220 if node == nullid:
221 return nullid, nullid
221 return nullid, nullid
222
222
223 ancestormap = self.repo.metadatastore.getancestors(self.filename, node)
223 ancestormap = self.repo.metadatastore.getancestors(self.filename, node)
224 p1, p2, linknode, copyfrom = ancestormap[node]
224 p1, p2, linknode, copyfrom = ancestormap[node]
225 if copyfrom:
225 if copyfrom:
226 p1 = nullid
226 p1 = nullid
227
227
228 return p1, p2
228 return p1, p2
229
229
230 def parentrevs(self, rev):
230 def parentrevs(self, rev):
231 # TODO(augie): this is a node and should be a rev, but for now
231 # TODO(augie): this is a node and should be a rev, but for now
232 # nothing in core seems to actually break.
232 # nothing in core seems to actually break.
233 return self.parents(rev)
233 return self.parents(rev)
234
234
235 def linknode(self, node):
235 def linknode(self, node):
236 ancestormap = self.repo.metadatastore.getancestors(self.filename, node)
236 ancestormap = self.repo.metadatastore.getancestors(self.filename, node)
237 p1, p2, linknode, copyfrom = ancestormap[node]
237 p1, p2, linknode, copyfrom = ancestormap[node]
238 return linknode
238 return linknode
239
239
240 def linkrev(self, node):
240 def linkrev(self, node):
241 return self.repo.unfiltered().changelog.rev(self.linknode(node))
241 return self.repo.unfiltered().changelog.rev(self.linknode(node))
242
242
243 def emitrevisions(self, nodes, nodesorder=None, revisiondata=False,
243 def emitrevisions(self, nodes, nodesorder=None, revisiondata=False,
244 assumehaveparentrevisions=False, deltaprevious=False,
244 assumehaveparentrevisions=False, deltaprevious=False,
245 deltamode=None):
245 deltamode=None):
246 # we don't use any of these parameters here
246 # we don't use any of these parameters here
247 del nodesorder, revisiondata, assumehaveparentrevisions, deltaprevious
247 del nodesorder, revisiondata, assumehaveparentrevisions, deltaprevious
248 del deltamode
248 del deltamode
249 prevnode = None
249 prevnode = None
250 for node in nodes:
250 for node in nodes:
251 p1, p2 = self.parents(node)
251 p1, p2 = self.parents(node)
252 if prevnode is None:
252 if prevnode is None:
253 basenode = prevnode = p1
253 basenode = prevnode = p1
254 if basenode == node:
254 if basenode == node:
255 basenode = nullid
255 basenode = nullid
256 if basenode != nullid:
256 if basenode != nullid:
257 revision = None
257 revision = None
258 delta = self.revdiff(basenode, node)
258 delta = self.revdiff(basenode, node)
259 else:
259 else:
260 revision = self.revision(node, raw=True)
260 revision = self.revision(node, raw=True)
261 delta = None
261 delta = None
262 yield revlog.revlogrevisiondelta(
262 yield revlog.revlogrevisiondelta(
263 node=node,
263 node=node,
264 p1node=p1,
264 p1node=p1,
265 p2node=p2,
265 p2node=p2,
266 linknode=self.linknode(node),
266 linknode=self.linknode(node),
267 basenode=basenode,
267 basenode=basenode,
268 flags=self.flags(node),
268 flags=self.flags(node),
269 baserevisionsize=None,
269 baserevisionsize=None,
270 revision=revision,
270 revision=revision,
271 delta=delta,
271 delta=delta,
272 )
272 )
273
273
274 def revdiff(self, node1, node2):
274 def revdiff(self, node1, node2):
275 return mdiff.textdiff(self.revision(node1, raw=True),
275 return mdiff.textdiff(self.revision(node1, raw=True),
276 self.revision(node2, raw=True))
276 self.revision(node2, raw=True))
277
277
278 def lookup(self, node):
278 def lookup(self, node):
279 if len(node) == 40:
279 if len(node) == 40:
280 node = bin(node)
280 node = bin(node)
281 if len(node) != 20:
281 if len(node) != 20:
282 raise error.LookupError(node, self.filename,
282 raise error.LookupError(node, self.filename,
283 _('invalid lookup input'))
283 _('invalid lookup input'))
284
284
285 return node
285 return node
286
286
287 def rev(self, node):
287 def rev(self, node):
288 # This is a hack to make TortoiseHG work.
288 # This is a hack to make TortoiseHG work.
289 return node
289 return node
290
290
291 def node(self, rev):
291 def node(self, rev):
292 # This is a hack.
292 # This is a hack.
293 if isinstance(rev, int):
293 if isinstance(rev, int):
294 raise error.ProgrammingError(
294 raise error.ProgrammingError(
295 'remotefilelog does not convert integer rev to node')
295 'remotefilelog does not convert integer rev to node')
296 return rev
296 return rev
297
297
298 def revision(self, node, raw=False):
298 def revision(self, node, raw=False):
299 """returns the revlog contents at this node.
299 """returns the revlog contents at this node.
300 this includes the meta data traditionally included in file revlogs.
300 this includes the meta data traditionally included in file revlogs.
301 this is generally only used for bundling and communicating with vanilla
301 this is generally only used for bundling and communicating with vanilla
302 hg clients.
302 hg clients.
303 """
303 """
304 if node == nullid:
304 if node == nullid:
305 return ""
305 return ""
306 if len(node) != 20:
306 if len(node) != 20:
307 raise error.LookupError(node, self.filename,
307 raise error.LookupError(node, self.filename,
308 _('invalid revision input'))
308 _('invalid revision input'))
309
309
310 store = self.repo.contentstore
310 store = self.repo.contentstore
311 rawtext = store.get(self.filename, node)
311 rawtext = store.get(self.filename, node)
312 if raw:
312 if raw:
313 return rawtext
313 return rawtext
314 flags = store.getmeta(self.filename, node).get(constants.METAKEYFLAG, 0)
314 flags = store.getmeta(self.filename, node).get(constants.METAKEYFLAG, 0)
315 if flags == 0:
315 if flags == 0:
316 return rawtext
316 return rawtext
317 text, verifyhash = self._processflags(rawtext, flags, 'read')
317 text, verifyhash = self._processflags(rawtext, flags, 'read')
318 return text
318 return text
319
319
320 def _processflags(self, text, flags, operation, raw=False):
320 def _processflags(self, text, flags, operation, raw=False):
321 # mostly copied from hg/mercurial/revlog.py
321 # mostly copied from hg/mercurial/revlog.py
322 validatehash = True
322 validatehash = True
323 orderedflags = revlog.REVIDX_FLAGS_ORDER
323 orderedflags = revlog.REVIDX_FLAGS_ORDER
324 if operation == 'write':
324 if operation == 'write':
325 orderedflags = reversed(orderedflags)
325 orderedflags = reversed(orderedflags)
326 for flag in orderedflags:
326 for flag in orderedflags:
327 if flag & flags:
327 if flag & flags:
328 vhash = True
328 vhash = True
329 if flag not in revlog._flagprocessors:
329 if flag not in revlog._flagprocessors:
330 message = _("missing processor for flag '%#x'") % (flag)
330 message = _("missing processor for flag '%#x'") % (flag)
331 raise revlog.RevlogError(message)
331 raise revlog.RevlogError(message)
332 readfunc, writefunc, rawfunc = revlog._flagprocessors[flag]
332 readfunc, writefunc, rawfunc = revlog._flagprocessors[flag]
333 if raw:
333 if raw:
334 vhash = rawfunc(self, text)
334 vhash = rawfunc(self, text)
335 elif operation == 'read':
335 elif operation == 'read':
336 text, vhash = readfunc(self, text)
336 text, vhash = readfunc(self, text)
337 elif operation == 'write':
337 elif operation == 'write':
338 text, vhash = writefunc(self, text)
338 text, vhash = writefunc(self, text)
339 validatehash = validatehash and vhash
339 validatehash = validatehash and vhash
340 return text, validatehash
340 return text, validatehash
341
341
342 def _read(self, id):
342 def _read(self, id):
343 """reads the raw file blob from disk, cache, or server"""
343 """reads the raw file blob from disk, cache, or server"""
344 fileservice = self.repo.fileservice
344 fileservice = self.repo.fileservice
345 localcache = fileservice.localcache
345 localcache = fileservice.localcache
346 cachekey = fileserverclient.getcachekey(self.repo.name, self.filename,
346 cachekey = fileserverclient.getcachekey(self.repo.name, self.filename,
347 id)
347 id)
348 try:
348 try:
349 return localcache.read(cachekey)
349 return localcache.read(cachekey)
350 except KeyError:
350 except KeyError:
351 pass
351 pass
352
352
353 localkey = fileserverclient.getlocalkey(self.filename, id)
353 localkey = fileserverclient.getlocalkey(self.filename, id)
354 localpath = os.path.join(self.localpath, localkey)
354 localpath = os.path.join(self.localpath, localkey)
355 try:
355 try:
356 return shallowutil.readfile(localpath)
356 return shallowutil.readfile(localpath)
357 except IOError:
357 except IOError:
358 pass
358 pass
359
359
360 fileservice.prefetch([(self.filename, id)])
360 fileservice.prefetch([(self.filename, id)])
361 try:
361 try:
362 return localcache.read(cachekey)
362 return localcache.read(cachekey)
363 except KeyError:
363 except KeyError:
364 pass
364 pass
365
365
366 raise error.LookupError(id, self.filename, _('no node'))
366 raise error.LookupError(id, self.filename, _('no node'))
367
367
368 def ancestormap(self, node):
368 def ancestormap(self, node):
369 return self.repo.metadatastore.getancestors(self.filename, node)
369 return self.repo.metadatastore.getancestors(self.filename, node)
370
370
371 def ancestor(self, a, b):
371 def ancestor(self, a, b):
372 if a == nullid or b == nullid:
372 if a == nullid or b == nullid:
373 return nullid
373 return nullid
374
374
375 revmap, parentfunc = self._buildrevgraph(a, b)
375 revmap, parentfunc = self._buildrevgraph(a, b)
376 nodemap = dict(((v, k) for (k, v) in revmap.iteritems()))
376 nodemap = dict(((v, k) for (k, v) in revmap.iteritems()))
377
377
378 ancs = ancestor.ancestors(parentfunc, revmap[a], revmap[b])
378 ancs = ancestor.ancestors(parentfunc, revmap[a], revmap[b])
379 if ancs:
379 if ancs:
380 # choose a consistent winner when there's a tie
380 # choose a consistent winner when there's a tie
381 return min(map(nodemap.__getitem__, ancs))
381 return min(map(nodemap.__getitem__, ancs))
382 return nullid
382 return nullid
383
383
384 def commonancestorsheads(self, a, b):
384 def commonancestorsheads(self, a, b):
385 """calculate all the heads of the common ancestors of nodes a and b"""
385 """calculate all the heads of the common ancestors of nodes a and b"""
386
386
387 if a == nullid or b == nullid:
387 if a == nullid or b == nullid:
388 return nullid
388 return nullid
389
389
390 revmap, parentfunc = self._buildrevgraph(a, b)
390 revmap, parentfunc = self._buildrevgraph(a, b)
391 nodemap = dict(((v, k) for (k, v) in revmap.iteritems()))
391 nodemap = dict(((v, k) for (k, v) in revmap.iteritems()))
392
392
393 ancs = ancestor.commonancestorsheads(parentfunc, revmap[a], revmap[b])
393 ancs = ancestor.commonancestorsheads(parentfunc, revmap[a], revmap[b])
394 return map(nodemap.__getitem__, ancs)
394 return map(nodemap.__getitem__, ancs)
395
395
396 def _buildrevgraph(self, a, b):
396 def _buildrevgraph(self, a, b):
397 """Builds a numeric revision graph for the given two nodes.
397 """Builds a numeric revision graph for the given two nodes.
398 Returns a node->rev map and a rev->[revs] parent function.
398 Returns a node->rev map and a rev->[revs] parent function.
399 """
399 """
400 amap = self.ancestormap(a)
400 amap = self.ancestormap(a)
401 bmap = self.ancestormap(b)
401 bmap = self.ancestormap(b)
402
402
403 # Union the two maps
403 # Union the two maps
404 parentsmap = collections.defaultdict(list)
404 parentsmap = collections.defaultdict(list)
405 allparents = set()
405 allparents = set()
406 for mapping in (amap, bmap):
406 for mapping in (amap, bmap):
407 for node, pdata in mapping.iteritems():
407 for node, pdata in mapping.iteritems():
408 parents = parentsmap[node]
408 parents = parentsmap[node]
409 p1, p2, linknode, copyfrom = pdata
409 p1, p2, linknode, copyfrom = pdata
410 # Don't follow renames (copyfrom).
410 # Don't follow renames (copyfrom).
411 # remotefilectx.ancestor does that.
411 # remotefilectx.ancestor does that.
412 if p1 != nullid and not copyfrom:
412 if p1 != nullid and not copyfrom:
413 parents.append(p1)
413 parents.append(p1)
414 allparents.add(p1)
414 allparents.add(p1)
415 if p2 != nullid:
415 if p2 != nullid:
416 parents.append(p2)
416 parents.append(p2)
417 allparents.add(p2)
417 allparents.add(p2)
418
418
419 # Breadth first traversal to build linkrev graph
419 # Breadth first traversal to build linkrev graph
420 parentrevs = collections.defaultdict(list)
420 parentrevs = collections.defaultdict(list)
421 revmap = {}
421 revmap = {}
422 queue = collections.deque(((None, n) for n in parentsmap.iterkeys()
422 queue = collections.deque(((None, n) for n in parentsmap
423 if n not in allparents))
423 if n not in allparents))
424 while queue:
424 while queue:
425 prevrev, current = queue.pop()
425 prevrev, current = queue.pop()
426 if current in revmap:
426 if current in revmap:
427 if prevrev:
427 if prevrev:
428 parentrevs[prevrev].append(revmap[current])
428 parentrevs[prevrev].append(revmap[current])
429 continue
429 continue
430
430
431 # Assign linkrevs in reverse order, so start at
431 # Assign linkrevs in reverse order, so start at
432 # len(parentsmap) and work backwards.
432 # len(parentsmap) and work backwards.
433 currentrev = len(parentsmap) - len(revmap) - 1
433 currentrev = len(parentsmap) - len(revmap) - 1
434 revmap[current] = currentrev
434 revmap[current] = currentrev
435
435
436 if prevrev:
436 if prevrev:
437 parentrevs[prevrev].append(currentrev)
437 parentrevs[prevrev].append(currentrev)
438
438
439 for parent in parentsmap.get(current):
439 for parent in parentsmap.get(current):
440 queue.appendleft((currentrev, parent))
440 queue.appendleft((currentrev, parent))
441
441
442 return revmap, parentrevs.__getitem__
442 return revmap, parentrevs.__getitem__
443
443
444 def strip(self, minlink, transaction):
444 def strip(self, minlink, transaction):
445 pass
445 pass
446
446
447 # misc unused things
447 # misc unused things
448 def files(self):
448 def files(self):
449 return []
449 return []
450
450
451 def checksize(self):
451 def checksize(self):
452 return 0, 0
452 return 0, 0
@@ -1,781 +1,781 b''
1 from __future__ import absolute_import
1 from __future__ import absolute_import
2
2
3 import os
3 import os
4 import time
4 import time
5
5
6 from mercurial.i18n import _
6 from mercurial.i18n import _
7 from mercurial.node import (
7 from mercurial.node import (
8 nullid,
8 nullid,
9 short,
9 short,
10 )
10 )
11 from mercurial import (
11 from mercurial import (
12 encoding,
12 encoding,
13 error,
13 error,
14 mdiff,
14 mdiff,
15 policy,
15 policy,
16 pycompat,
16 pycompat,
17 scmutil,
17 scmutil,
18 util,
18 util,
19 vfs,
19 vfs,
20 )
20 )
21 from mercurial.utils import procutil
21 from mercurial.utils import procutil
22 from . import (
22 from . import (
23 constants,
23 constants,
24 contentstore,
24 contentstore,
25 datapack,
25 datapack,
26 extutil,
26 extutil,
27 historypack,
27 historypack,
28 metadatastore,
28 metadatastore,
29 shallowutil,
29 shallowutil,
30 )
30 )
31
31
32 osutil = policy.importmod(r'osutil')
32 osutil = policy.importmod(r'osutil')
33
33
34 class RepackAlreadyRunning(error.Abort):
34 class RepackAlreadyRunning(error.Abort):
35 pass
35 pass
36
36
37 if util.safehasattr(util, '_hgexecutable'):
37 if util.safehasattr(util, '_hgexecutable'):
38 # Before 5be286db
38 # Before 5be286db
39 _hgexecutable = util.hgexecutable
39 _hgexecutable = util.hgexecutable
40 else:
40 else:
41 from mercurial.utils import procutil
41 from mercurial.utils import procutil
42 _hgexecutable = procutil.hgexecutable
42 _hgexecutable = procutil.hgexecutable
43
43
44 def backgroundrepack(repo, incremental=True, packsonly=False):
44 def backgroundrepack(repo, incremental=True, packsonly=False):
45 cmd = [_hgexecutable(), '-R', repo.origroot, 'repack']
45 cmd = [_hgexecutable(), '-R', repo.origroot, 'repack']
46 msg = _("(running background repack)\n")
46 msg = _("(running background repack)\n")
47 if incremental:
47 if incremental:
48 cmd.append('--incremental')
48 cmd.append('--incremental')
49 msg = _("(running background incremental repack)\n")
49 msg = _("(running background incremental repack)\n")
50 if packsonly:
50 if packsonly:
51 cmd.append('--packsonly')
51 cmd.append('--packsonly')
52 repo.ui.warn(msg)
52 repo.ui.warn(msg)
53 procutil.runbgcommand(cmd, encoding.environ)
53 procutil.runbgcommand(cmd, encoding.environ)
54
54
55 def fullrepack(repo, options=None):
55 def fullrepack(repo, options=None):
56 """If ``packsonly`` is True, stores creating only loose objects are skipped.
56 """If ``packsonly`` is True, stores creating only loose objects are skipped.
57 """
57 """
58 if util.safehasattr(repo, 'shareddatastores'):
58 if util.safehasattr(repo, 'shareddatastores'):
59 datasource = contentstore.unioncontentstore(
59 datasource = contentstore.unioncontentstore(
60 *repo.shareddatastores)
60 *repo.shareddatastores)
61 historysource = metadatastore.unionmetadatastore(
61 historysource = metadatastore.unionmetadatastore(
62 *repo.sharedhistorystores,
62 *repo.sharedhistorystores,
63 allowincomplete=True)
63 allowincomplete=True)
64
64
65 packpath = shallowutil.getcachepackpath(
65 packpath = shallowutil.getcachepackpath(
66 repo,
66 repo,
67 constants.FILEPACK_CATEGORY)
67 constants.FILEPACK_CATEGORY)
68 _runrepack(repo, datasource, historysource, packpath,
68 _runrepack(repo, datasource, historysource, packpath,
69 constants.FILEPACK_CATEGORY, options=options)
69 constants.FILEPACK_CATEGORY, options=options)
70
70
71 if util.safehasattr(repo.manifestlog, 'datastore'):
71 if util.safehasattr(repo.manifestlog, 'datastore'):
72 localdata, shareddata = _getmanifeststores(repo)
72 localdata, shareddata = _getmanifeststores(repo)
73 lpackpath, ldstores, lhstores = localdata
73 lpackpath, ldstores, lhstores = localdata
74 spackpath, sdstores, shstores = shareddata
74 spackpath, sdstores, shstores = shareddata
75
75
76 # Repack the shared manifest store
76 # Repack the shared manifest store
77 datasource = contentstore.unioncontentstore(*sdstores)
77 datasource = contentstore.unioncontentstore(*sdstores)
78 historysource = metadatastore.unionmetadatastore(
78 historysource = metadatastore.unionmetadatastore(
79 *shstores,
79 *shstores,
80 allowincomplete=True)
80 allowincomplete=True)
81 _runrepack(repo, datasource, historysource, spackpath,
81 _runrepack(repo, datasource, historysource, spackpath,
82 constants.TREEPACK_CATEGORY, options=options)
82 constants.TREEPACK_CATEGORY, options=options)
83
83
84 # Repack the local manifest store
84 # Repack the local manifest store
85 datasource = contentstore.unioncontentstore(
85 datasource = contentstore.unioncontentstore(
86 *ldstores,
86 *ldstores,
87 allowincomplete=True)
87 allowincomplete=True)
88 historysource = metadatastore.unionmetadatastore(
88 historysource = metadatastore.unionmetadatastore(
89 *lhstores,
89 *lhstores,
90 allowincomplete=True)
90 allowincomplete=True)
91 _runrepack(repo, datasource, historysource, lpackpath,
91 _runrepack(repo, datasource, historysource, lpackpath,
92 constants.TREEPACK_CATEGORY, options=options)
92 constants.TREEPACK_CATEGORY, options=options)
93
93
94 def incrementalrepack(repo, options=None):
94 def incrementalrepack(repo, options=None):
95 """This repacks the repo by looking at the distribution of pack files in the
95 """This repacks the repo by looking at the distribution of pack files in the
96 repo and performing the most minimal repack to keep the repo in good shape.
96 repo and performing the most minimal repack to keep the repo in good shape.
97 """
97 """
98 if util.safehasattr(repo, 'shareddatastores'):
98 if util.safehasattr(repo, 'shareddatastores'):
99 packpath = shallowutil.getcachepackpath(
99 packpath = shallowutil.getcachepackpath(
100 repo,
100 repo,
101 constants.FILEPACK_CATEGORY)
101 constants.FILEPACK_CATEGORY)
102 _incrementalrepack(repo,
102 _incrementalrepack(repo,
103 repo.shareddatastores,
103 repo.shareddatastores,
104 repo.sharedhistorystores,
104 repo.sharedhistorystores,
105 packpath,
105 packpath,
106 constants.FILEPACK_CATEGORY,
106 constants.FILEPACK_CATEGORY,
107 options=options)
107 options=options)
108
108
109 if util.safehasattr(repo.manifestlog, 'datastore'):
109 if util.safehasattr(repo.manifestlog, 'datastore'):
110 localdata, shareddata = _getmanifeststores(repo)
110 localdata, shareddata = _getmanifeststores(repo)
111 lpackpath, ldstores, lhstores = localdata
111 lpackpath, ldstores, lhstores = localdata
112 spackpath, sdstores, shstores = shareddata
112 spackpath, sdstores, shstores = shareddata
113
113
114 # Repack the shared manifest store
114 # Repack the shared manifest store
115 _incrementalrepack(repo,
115 _incrementalrepack(repo,
116 sdstores,
116 sdstores,
117 shstores,
117 shstores,
118 spackpath,
118 spackpath,
119 constants.TREEPACK_CATEGORY,
119 constants.TREEPACK_CATEGORY,
120 options=options)
120 options=options)
121
121
122 # Repack the local manifest store
122 # Repack the local manifest store
123 _incrementalrepack(repo,
123 _incrementalrepack(repo,
124 ldstores,
124 ldstores,
125 lhstores,
125 lhstores,
126 lpackpath,
126 lpackpath,
127 constants.TREEPACK_CATEGORY,
127 constants.TREEPACK_CATEGORY,
128 allowincompletedata=True,
128 allowincompletedata=True,
129 options=options)
129 options=options)
130
130
131 def _getmanifeststores(repo):
131 def _getmanifeststores(repo):
132 shareddatastores = repo.manifestlog.shareddatastores
132 shareddatastores = repo.manifestlog.shareddatastores
133 localdatastores = repo.manifestlog.localdatastores
133 localdatastores = repo.manifestlog.localdatastores
134 sharedhistorystores = repo.manifestlog.sharedhistorystores
134 sharedhistorystores = repo.manifestlog.sharedhistorystores
135 localhistorystores = repo.manifestlog.localhistorystores
135 localhistorystores = repo.manifestlog.localhistorystores
136
136
137 sharedpackpath = shallowutil.getcachepackpath(repo,
137 sharedpackpath = shallowutil.getcachepackpath(repo,
138 constants.TREEPACK_CATEGORY)
138 constants.TREEPACK_CATEGORY)
139 localpackpath = shallowutil.getlocalpackpath(repo.svfs.vfs.base,
139 localpackpath = shallowutil.getlocalpackpath(repo.svfs.vfs.base,
140 constants.TREEPACK_CATEGORY)
140 constants.TREEPACK_CATEGORY)
141
141
142 return ((localpackpath, localdatastores, localhistorystores),
142 return ((localpackpath, localdatastores, localhistorystores),
143 (sharedpackpath, shareddatastores, sharedhistorystores))
143 (sharedpackpath, shareddatastores, sharedhistorystores))
144
144
145 def _topacks(packpath, files, constructor):
145 def _topacks(packpath, files, constructor):
146 paths = list(os.path.join(packpath, p) for p in files)
146 paths = list(os.path.join(packpath, p) for p in files)
147 packs = list(constructor(p) for p in paths)
147 packs = list(constructor(p) for p in paths)
148 return packs
148 return packs
149
149
150 def _deletebigpacks(repo, folder, files):
150 def _deletebigpacks(repo, folder, files):
151 """Deletes packfiles that are bigger than ``packs.maxpacksize``.
151 """Deletes packfiles that are bigger than ``packs.maxpacksize``.
152
152
153 Returns ``files` with the removed files omitted."""
153 Returns ``files` with the removed files omitted."""
154 maxsize = repo.ui.configbytes("packs", "maxpacksize")
154 maxsize = repo.ui.configbytes("packs", "maxpacksize")
155 if maxsize <= 0:
155 if maxsize <= 0:
156 return files
156 return files
157
157
158 # This only considers datapacks today, but we could broaden it to include
158 # This only considers datapacks today, but we could broaden it to include
159 # historypacks.
159 # historypacks.
160 VALIDEXTS = [".datapack", ".dataidx"]
160 VALIDEXTS = [".datapack", ".dataidx"]
161
161
162 # Either an oversize index or datapack will trigger cleanup of the whole
162 # Either an oversize index or datapack will trigger cleanup of the whole
163 # pack:
163 # pack:
164 oversized = set([os.path.splitext(path)[0] for path, ftype, stat in files
164 oversized = set([os.path.splitext(path)[0] for path, ftype, stat in files
165 if (stat.st_size > maxsize and (os.path.splitext(path)[1]
165 if (stat.st_size > maxsize and (os.path.splitext(path)[1]
166 in VALIDEXTS))])
166 in VALIDEXTS))])
167
167
168 for rootfname in oversized:
168 for rootfname in oversized:
169 rootpath = os.path.join(folder, rootfname)
169 rootpath = os.path.join(folder, rootfname)
170 for ext in VALIDEXTS:
170 for ext in VALIDEXTS:
171 path = rootpath + ext
171 path = rootpath + ext
172 repo.ui.debug('removing oversize packfile %s (%s)\n' %
172 repo.ui.debug('removing oversize packfile %s (%s)\n' %
173 (path, util.bytecount(os.stat(path).st_size)))
173 (path, util.bytecount(os.stat(path).st_size)))
174 os.unlink(path)
174 os.unlink(path)
175 return [row for row in files if os.path.basename(row[0]) not in oversized]
175 return [row for row in files if os.path.basename(row[0]) not in oversized]
176
176
177 def _incrementalrepack(repo, datastore, historystore, packpath, category,
177 def _incrementalrepack(repo, datastore, historystore, packpath, category,
178 allowincompletedata=False, options=None):
178 allowincompletedata=False, options=None):
179 shallowutil.mkstickygroupdir(repo.ui, packpath)
179 shallowutil.mkstickygroupdir(repo.ui, packpath)
180
180
181 files = osutil.listdir(packpath, stat=True)
181 files = osutil.listdir(packpath, stat=True)
182 files = _deletebigpacks(repo, packpath, files)
182 files = _deletebigpacks(repo, packpath, files)
183 datapacks = _topacks(packpath,
183 datapacks = _topacks(packpath,
184 _computeincrementaldatapack(repo.ui, files),
184 _computeincrementaldatapack(repo.ui, files),
185 datapack.datapack)
185 datapack.datapack)
186 datapacks.extend(s for s in datastore
186 datapacks.extend(s for s in datastore
187 if not isinstance(s, datapack.datapackstore))
187 if not isinstance(s, datapack.datapackstore))
188
188
189 historypacks = _topacks(packpath,
189 historypacks = _topacks(packpath,
190 _computeincrementalhistorypack(repo.ui, files),
190 _computeincrementalhistorypack(repo.ui, files),
191 historypack.historypack)
191 historypack.historypack)
192 historypacks.extend(s for s in historystore
192 historypacks.extend(s for s in historystore
193 if not isinstance(s, historypack.historypackstore))
193 if not isinstance(s, historypack.historypackstore))
194
194
195 # ``allhistory{files,packs}`` contains all known history packs, even ones we
195 # ``allhistory{files,packs}`` contains all known history packs, even ones we
196 # don't plan to repack. They are used during the datapack repack to ensure
196 # don't plan to repack. They are used during the datapack repack to ensure
197 # good ordering of nodes.
197 # good ordering of nodes.
198 allhistoryfiles = _allpackfileswithsuffix(files, historypack.PACKSUFFIX,
198 allhistoryfiles = _allpackfileswithsuffix(files, historypack.PACKSUFFIX,
199 historypack.INDEXSUFFIX)
199 historypack.INDEXSUFFIX)
200 allhistorypacks = _topacks(packpath,
200 allhistorypacks = _topacks(packpath,
201 (f for f, mode, stat in allhistoryfiles),
201 (f for f, mode, stat in allhistoryfiles),
202 historypack.historypack)
202 historypack.historypack)
203 allhistorypacks.extend(s for s in historystore
203 allhistorypacks.extend(s for s in historystore
204 if not isinstance(s, historypack.historypackstore))
204 if not isinstance(s, historypack.historypackstore))
205 _runrepack(repo,
205 _runrepack(repo,
206 contentstore.unioncontentstore(
206 contentstore.unioncontentstore(
207 *datapacks,
207 *datapacks,
208 allowincomplete=allowincompletedata),
208 allowincomplete=allowincompletedata),
209 metadatastore.unionmetadatastore(
209 metadatastore.unionmetadatastore(
210 *historypacks,
210 *historypacks,
211 allowincomplete=True),
211 allowincomplete=True),
212 packpath, category,
212 packpath, category,
213 fullhistory=metadatastore.unionmetadatastore(
213 fullhistory=metadatastore.unionmetadatastore(
214 *allhistorypacks,
214 *allhistorypacks,
215 allowincomplete=True),
215 allowincomplete=True),
216 options=options)
216 options=options)
217
217
218 def _computeincrementaldatapack(ui, files):
218 def _computeincrementaldatapack(ui, files):
219 opts = {
219 opts = {
220 'gencountlimit' : ui.configint(
220 'gencountlimit' : ui.configint(
221 'remotefilelog', 'data.gencountlimit'),
221 'remotefilelog', 'data.gencountlimit'),
222 'generations' : ui.configlist(
222 'generations' : ui.configlist(
223 'remotefilelog', 'data.generations'),
223 'remotefilelog', 'data.generations'),
224 'maxrepackpacks' : ui.configint(
224 'maxrepackpacks' : ui.configint(
225 'remotefilelog', 'data.maxrepackpacks'),
225 'remotefilelog', 'data.maxrepackpacks'),
226 'repackmaxpacksize' : ui.configbytes(
226 'repackmaxpacksize' : ui.configbytes(
227 'remotefilelog', 'data.repackmaxpacksize'),
227 'remotefilelog', 'data.repackmaxpacksize'),
228 'repacksizelimit' : ui.configbytes(
228 'repacksizelimit' : ui.configbytes(
229 'remotefilelog', 'data.repacksizelimit'),
229 'remotefilelog', 'data.repacksizelimit'),
230 }
230 }
231
231
232 packfiles = _allpackfileswithsuffix(
232 packfiles = _allpackfileswithsuffix(
233 files, datapack.PACKSUFFIX, datapack.INDEXSUFFIX)
233 files, datapack.PACKSUFFIX, datapack.INDEXSUFFIX)
234 return _computeincrementalpack(packfiles, opts)
234 return _computeincrementalpack(packfiles, opts)
235
235
236 def _computeincrementalhistorypack(ui, files):
236 def _computeincrementalhistorypack(ui, files):
237 opts = {
237 opts = {
238 'gencountlimit' : ui.configint(
238 'gencountlimit' : ui.configint(
239 'remotefilelog', 'history.gencountlimit'),
239 'remotefilelog', 'history.gencountlimit'),
240 'generations' : ui.configlist(
240 'generations' : ui.configlist(
241 'remotefilelog', 'history.generations', ['100MB']),
241 'remotefilelog', 'history.generations', ['100MB']),
242 'maxrepackpacks' : ui.configint(
242 'maxrepackpacks' : ui.configint(
243 'remotefilelog', 'history.maxrepackpacks'),
243 'remotefilelog', 'history.maxrepackpacks'),
244 'repackmaxpacksize' : ui.configbytes(
244 'repackmaxpacksize' : ui.configbytes(
245 'remotefilelog', 'history.repackmaxpacksize', '400MB'),
245 'remotefilelog', 'history.repackmaxpacksize', '400MB'),
246 'repacksizelimit' : ui.configbytes(
246 'repacksizelimit' : ui.configbytes(
247 'remotefilelog', 'history.repacksizelimit'),
247 'remotefilelog', 'history.repacksizelimit'),
248 }
248 }
249
249
250 packfiles = _allpackfileswithsuffix(
250 packfiles = _allpackfileswithsuffix(
251 files, historypack.PACKSUFFIX, historypack.INDEXSUFFIX)
251 files, historypack.PACKSUFFIX, historypack.INDEXSUFFIX)
252 return _computeincrementalpack(packfiles, opts)
252 return _computeincrementalpack(packfiles, opts)
253
253
254 def _allpackfileswithsuffix(files, packsuffix, indexsuffix):
254 def _allpackfileswithsuffix(files, packsuffix, indexsuffix):
255 result = []
255 result = []
256 fileset = set(fn for fn, mode, stat in files)
256 fileset = set(fn for fn, mode, stat in files)
257 for filename, mode, stat in files:
257 for filename, mode, stat in files:
258 if not filename.endswith(packsuffix):
258 if not filename.endswith(packsuffix):
259 continue
259 continue
260
260
261 prefix = filename[:-len(packsuffix)]
261 prefix = filename[:-len(packsuffix)]
262
262
263 # Don't process a pack if it doesn't have an index.
263 # Don't process a pack if it doesn't have an index.
264 if (prefix + indexsuffix) not in fileset:
264 if (prefix + indexsuffix) not in fileset:
265 continue
265 continue
266 result.append((prefix, mode, stat))
266 result.append((prefix, mode, stat))
267
267
268 return result
268 return result
269
269
270 def _computeincrementalpack(files, opts):
270 def _computeincrementalpack(files, opts):
271 """Given a set of pack files along with the configuration options, this
271 """Given a set of pack files along with the configuration options, this
272 function computes the list of files that should be packed as part of an
272 function computes the list of files that should be packed as part of an
273 incremental repack.
273 incremental repack.
274
274
275 It tries to strike a balance between keeping incremental repacks cheap (i.e.
275 It tries to strike a balance between keeping incremental repacks cheap (i.e.
276 packing small things when possible, and rolling the packs up to the big ones
276 packing small things when possible, and rolling the packs up to the big ones
277 over time).
277 over time).
278 """
278 """
279
279
280 limits = list(sorted((util.sizetoint(s) for s in opts['generations']),
280 limits = list(sorted((util.sizetoint(s) for s in opts['generations']),
281 reverse=True))
281 reverse=True))
282 limits.append(0)
282 limits.append(0)
283
283
284 # Group the packs by generation (i.e. by size)
284 # Group the packs by generation (i.e. by size)
285 generations = []
285 generations = []
286 for i in pycompat.xrange(len(limits)):
286 for i in pycompat.xrange(len(limits)):
287 generations.append([])
287 generations.append([])
288
288
289 sizes = {}
289 sizes = {}
290 for prefix, mode, stat in files:
290 for prefix, mode, stat in files:
291 size = stat.st_size
291 size = stat.st_size
292 if size > opts['repackmaxpacksize']:
292 if size > opts['repackmaxpacksize']:
293 continue
293 continue
294
294
295 sizes[prefix] = size
295 sizes[prefix] = size
296 for i, limit in enumerate(limits):
296 for i, limit in enumerate(limits):
297 if size > limit:
297 if size > limit:
298 generations[i].append(prefix)
298 generations[i].append(prefix)
299 break
299 break
300
300
301 # Steps for picking what packs to repack:
301 # Steps for picking what packs to repack:
302 # 1. Pick the largest generation with > gencountlimit pack files.
302 # 1. Pick the largest generation with > gencountlimit pack files.
303 # 2. Take the smallest three packs.
303 # 2. Take the smallest three packs.
304 # 3. While total-size-of-packs < repacksizelimit: add another pack
304 # 3. While total-size-of-packs < repacksizelimit: add another pack
305
305
306 # Find the largest generation with more than gencountlimit packs
306 # Find the largest generation with more than gencountlimit packs
307 genpacks = []
307 genpacks = []
308 for i, limit in enumerate(limits):
308 for i, limit in enumerate(limits):
309 if len(generations[i]) > opts['gencountlimit']:
309 if len(generations[i]) > opts['gencountlimit']:
310 # Sort to be smallest last, for easy popping later
310 # Sort to be smallest last, for easy popping later
311 genpacks.extend(sorted(generations[i], reverse=True,
311 genpacks.extend(sorted(generations[i], reverse=True,
312 key=lambda x: sizes[x]))
312 key=lambda x: sizes[x]))
313 break
313 break
314
314
315 # Take as many packs from the generation as we can
315 # Take as many packs from the generation as we can
316 chosenpacks = genpacks[-3:]
316 chosenpacks = genpacks[-3:]
317 genpacks = genpacks[:-3]
317 genpacks = genpacks[:-3]
318 repacksize = sum(sizes[n] for n in chosenpacks)
318 repacksize = sum(sizes[n] for n in chosenpacks)
319 while (repacksize < opts['repacksizelimit'] and genpacks and
319 while (repacksize < opts['repacksizelimit'] and genpacks and
320 len(chosenpacks) < opts['maxrepackpacks']):
320 len(chosenpacks) < opts['maxrepackpacks']):
321 chosenpacks.append(genpacks.pop())
321 chosenpacks.append(genpacks.pop())
322 repacksize += sizes[chosenpacks[-1]]
322 repacksize += sizes[chosenpacks[-1]]
323
323
324 return chosenpacks
324 return chosenpacks
325
325
326 def _runrepack(repo, data, history, packpath, category, fullhistory=None,
326 def _runrepack(repo, data, history, packpath, category, fullhistory=None,
327 options=None):
327 options=None):
328 shallowutil.mkstickygroupdir(repo.ui, packpath)
328 shallowutil.mkstickygroupdir(repo.ui, packpath)
329
329
330 def isold(repo, filename, node):
330 def isold(repo, filename, node):
331 """Check if the file node is older than a limit.
331 """Check if the file node is older than a limit.
332 Unless a limit is specified in the config the default limit is taken.
332 Unless a limit is specified in the config the default limit is taken.
333 """
333 """
334 filectx = repo.filectx(filename, fileid=node)
334 filectx = repo.filectx(filename, fileid=node)
335 filetime = repo[filectx.linkrev()].date()
335 filetime = repo[filectx.linkrev()].date()
336
336
337 ttl = repo.ui.configint('remotefilelog', 'nodettl')
337 ttl = repo.ui.configint('remotefilelog', 'nodettl')
338
338
339 limit = time.time() - ttl
339 limit = time.time() - ttl
340 return filetime[0] < limit
340 return filetime[0] < limit
341
341
342 garbagecollect = repo.ui.configbool('remotefilelog', 'gcrepack')
342 garbagecollect = repo.ui.configbool('remotefilelog', 'gcrepack')
343 if not fullhistory:
343 if not fullhistory:
344 fullhistory = history
344 fullhistory = history
345 packer = repacker(repo, data, history, fullhistory, category,
345 packer = repacker(repo, data, history, fullhistory, category,
346 gc=garbagecollect, isold=isold, options=options)
346 gc=garbagecollect, isold=isold, options=options)
347
347
348 with datapack.mutabledatapack(repo.ui, packpath, version=2) as dpack:
348 with datapack.mutabledatapack(repo.ui, packpath, version=2) as dpack:
349 with historypack.mutablehistorypack(repo.ui, packpath) as hpack:
349 with historypack.mutablehistorypack(repo.ui, packpath) as hpack:
350 try:
350 try:
351 packer.run(dpack, hpack)
351 packer.run(dpack, hpack)
352 except error.LockHeld:
352 except error.LockHeld:
353 raise RepackAlreadyRunning(_("skipping repack - another repack "
353 raise RepackAlreadyRunning(_("skipping repack - another repack "
354 "is already running"))
354 "is already running"))
355
355
356 def keepset(repo, keyfn, lastkeepkeys=None):
356 def keepset(repo, keyfn, lastkeepkeys=None):
357 """Computes a keepset which is not garbage collected.
357 """Computes a keepset which is not garbage collected.
358 'keyfn' is a function that maps filename, node to a unique key.
358 'keyfn' is a function that maps filename, node to a unique key.
359 'lastkeepkeys' is an optional argument and if provided the keepset
359 'lastkeepkeys' is an optional argument and if provided the keepset
360 function updates lastkeepkeys with more keys and returns the result.
360 function updates lastkeepkeys with more keys and returns the result.
361 """
361 """
362 if not lastkeepkeys:
362 if not lastkeepkeys:
363 keepkeys = set()
363 keepkeys = set()
364 else:
364 else:
365 keepkeys = lastkeepkeys
365 keepkeys = lastkeepkeys
366
366
367 # We want to keep:
367 # We want to keep:
368 # 1. Working copy parent
368 # 1. Working copy parent
369 # 2. Draft commits
369 # 2. Draft commits
370 # 3. Parents of draft commits
370 # 3. Parents of draft commits
371 # 4. Pullprefetch and bgprefetchrevs revsets if specified
371 # 4. Pullprefetch and bgprefetchrevs revsets if specified
372 revs = ['.', 'draft()', 'parents(draft())']
372 revs = ['.', 'draft()', 'parents(draft())']
373 prefetchrevs = repo.ui.config('remotefilelog', 'pullprefetch', None)
373 prefetchrevs = repo.ui.config('remotefilelog', 'pullprefetch', None)
374 if prefetchrevs:
374 if prefetchrevs:
375 revs.append('(%s)' % prefetchrevs)
375 revs.append('(%s)' % prefetchrevs)
376 prefetchrevs = repo.ui.config('remotefilelog', 'bgprefetchrevs', None)
376 prefetchrevs = repo.ui.config('remotefilelog', 'bgprefetchrevs', None)
377 if prefetchrevs:
377 if prefetchrevs:
378 revs.append('(%s)' % prefetchrevs)
378 revs.append('(%s)' % prefetchrevs)
379 revs = '+'.join(revs)
379 revs = '+'.join(revs)
380
380
381 revs = ['sort((%s), "topo")' % revs]
381 revs = ['sort((%s), "topo")' % revs]
382 keep = scmutil.revrange(repo, revs)
382 keep = scmutil.revrange(repo, revs)
383
383
384 processed = set()
384 processed = set()
385 lastmanifest = None
385 lastmanifest = None
386
386
387 # process the commits in toposorted order starting from the oldest
387 # process the commits in toposorted order starting from the oldest
388 for r in reversed(keep._list):
388 for r in reversed(keep._list):
389 if repo[r].p1().rev() in processed:
389 if repo[r].p1().rev() in processed:
390 # if the direct parent has already been processed
390 # if the direct parent has already been processed
391 # then we only need to process the delta
391 # then we only need to process the delta
392 m = repo[r].manifestctx().readdelta()
392 m = repo[r].manifestctx().readdelta()
393 else:
393 else:
394 # otherwise take the manifest and diff it
394 # otherwise take the manifest and diff it
395 # with the previous manifest if one exists
395 # with the previous manifest if one exists
396 if lastmanifest:
396 if lastmanifest:
397 m = repo[r].manifest().diff(lastmanifest)
397 m = repo[r].manifest().diff(lastmanifest)
398 else:
398 else:
399 m = repo[r].manifest()
399 m = repo[r].manifest()
400 lastmanifest = repo[r].manifest()
400 lastmanifest = repo[r].manifest()
401 processed.add(r)
401 processed.add(r)
402
402
403 # populate keepkeys with keys from the current manifest
403 # populate keepkeys with keys from the current manifest
404 if type(m) is dict:
404 if type(m) is dict:
405 # m is a result of diff of two manifests and is a dictionary that
405 # m is a result of diff of two manifests and is a dictionary that
406 # maps filename to ((newnode, newflag), (oldnode, oldflag)) tuple
406 # maps filename to ((newnode, newflag), (oldnode, oldflag)) tuple
407 for filename, diff in m.iteritems():
407 for filename, diff in m.iteritems():
408 if diff[0][0] is not None:
408 if diff[0][0] is not None:
409 keepkeys.add(keyfn(filename, diff[0][0]))
409 keepkeys.add(keyfn(filename, diff[0][0]))
410 else:
410 else:
411 # m is a manifest object
411 # m is a manifest object
412 for filename, filenode in m.iteritems():
412 for filename, filenode in m.iteritems():
413 keepkeys.add(keyfn(filename, filenode))
413 keepkeys.add(keyfn(filename, filenode))
414
414
415 return keepkeys
415 return keepkeys
416
416
417 class repacker(object):
417 class repacker(object):
418 """Class for orchestrating the repack of data and history information into a
418 """Class for orchestrating the repack of data and history information into a
419 new format.
419 new format.
420 """
420 """
421 def __init__(self, repo, data, history, fullhistory, category, gc=False,
421 def __init__(self, repo, data, history, fullhistory, category, gc=False,
422 isold=None, options=None):
422 isold=None, options=None):
423 self.repo = repo
423 self.repo = repo
424 self.data = data
424 self.data = data
425 self.history = history
425 self.history = history
426 self.fullhistory = fullhistory
426 self.fullhistory = fullhistory
427 self.unit = constants.getunits(category)
427 self.unit = constants.getunits(category)
428 self.garbagecollect = gc
428 self.garbagecollect = gc
429 self.options = options
429 self.options = options
430 if self.garbagecollect:
430 if self.garbagecollect:
431 if not isold:
431 if not isold:
432 raise ValueError("Function 'isold' is not properly specified")
432 raise ValueError("Function 'isold' is not properly specified")
433 # use (filename, node) tuple as a keepset key
433 # use (filename, node) tuple as a keepset key
434 self.keepkeys = keepset(repo, lambda f, n : (f, n))
434 self.keepkeys = keepset(repo, lambda f, n : (f, n))
435 self.isold = isold
435 self.isold = isold
436
436
437 def run(self, targetdata, targethistory):
437 def run(self, targetdata, targethistory):
438 ledger = repackledger()
438 ledger = repackledger()
439
439
440 with extutil.flock(repacklockvfs(self.repo).join("repacklock"),
440 with extutil.flock(repacklockvfs(self.repo).join("repacklock"),
441 _('repacking %s') % self.repo.origroot, timeout=0):
441 _('repacking %s') % self.repo.origroot, timeout=0):
442 self.repo.hook('prerepack')
442 self.repo.hook('prerepack')
443
443
444 # Populate ledger from source
444 # Populate ledger from source
445 self.data.markledger(ledger, options=self.options)
445 self.data.markledger(ledger, options=self.options)
446 self.history.markledger(ledger, options=self.options)
446 self.history.markledger(ledger, options=self.options)
447
447
448 # Run repack
448 # Run repack
449 self.repackdata(ledger, targetdata)
449 self.repackdata(ledger, targetdata)
450 self.repackhistory(ledger, targethistory)
450 self.repackhistory(ledger, targethistory)
451
451
452 # Call cleanup on each source
452 # Call cleanup on each source
453 for source in ledger.sources:
453 for source in ledger.sources:
454 source.cleanup(ledger)
454 source.cleanup(ledger)
455
455
456 def _chainorphans(self, ui, filename, nodes, orphans, deltabases):
456 def _chainorphans(self, ui, filename, nodes, orphans, deltabases):
457 """Reorderes ``orphans`` into a single chain inside ``nodes`` and
457 """Reorderes ``orphans`` into a single chain inside ``nodes`` and
458 ``deltabases``.
458 ``deltabases``.
459
459
460 We often have orphan entries (nodes without a base that aren't
460 We often have orphan entries (nodes without a base that aren't
461 referenced by other nodes -- i.e., part of a chain) due to gaps in
461 referenced by other nodes -- i.e., part of a chain) due to gaps in
462 history. Rather than store them as individual fulltexts, we prefer to
462 history. Rather than store them as individual fulltexts, we prefer to
463 insert them as one chain sorted by size.
463 insert them as one chain sorted by size.
464 """
464 """
465 if not orphans:
465 if not orphans:
466 return nodes
466 return nodes
467
467
468 def getsize(node, default=0):
468 def getsize(node, default=0):
469 meta = self.data.getmeta(filename, node)
469 meta = self.data.getmeta(filename, node)
470 if constants.METAKEYSIZE in meta:
470 if constants.METAKEYSIZE in meta:
471 return meta[constants.METAKEYSIZE]
471 return meta[constants.METAKEYSIZE]
472 else:
472 else:
473 return default
473 return default
474
474
475 # Sort orphans by size; biggest first is preferred, since it's more
475 # Sort orphans by size; biggest first is preferred, since it's more
476 # likely to be the newest version assuming files grow over time.
476 # likely to be the newest version assuming files grow over time.
477 # (Sort by node first to ensure the sort is stable.)
477 # (Sort by node first to ensure the sort is stable.)
478 orphans = sorted(orphans)
478 orphans = sorted(orphans)
479 orphans = list(sorted(orphans, key=getsize, reverse=True))
479 orphans = list(sorted(orphans, key=getsize, reverse=True))
480 if ui.debugflag:
480 if ui.debugflag:
481 ui.debug("%s: orphan chain: %s\n" % (filename,
481 ui.debug("%s: orphan chain: %s\n" % (filename,
482 ", ".join([short(s) for s in orphans])))
482 ", ".join([short(s) for s in orphans])))
483
483
484 # Create one contiguous chain and reassign deltabases.
484 # Create one contiguous chain and reassign deltabases.
485 for i, node in enumerate(orphans):
485 for i, node in enumerate(orphans):
486 if i == 0:
486 if i == 0:
487 deltabases[node] = (nullid, 0)
487 deltabases[node] = (nullid, 0)
488 else:
488 else:
489 parent = orphans[i - 1]
489 parent = orphans[i - 1]
490 deltabases[node] = (parent, deltabases[parent][1] + 1)
490 deltabases[node] = (parent, deltabases[parent][1] + 1)
491 nodes = filter(lambda node: node not in orphans, nodes)
491 nodes = filter(lambda node: node not in orphans, nodes)
492 nodes += orphans
492 nodes += orphans
493 return nodes
493 return nodes
494
494
495 def repackdata(self, ledger, target):
495 def repackdata(self, ledger, target):
496 ui = self.repo.ui
496 ui = self.repo.ui
497 maxchainlen = ui.configint('packs', 'maxchainlen', 1000)
497 maxchainlen = ui.configint('packs', 'maxchainlen', 1000)
498
498
499 byfile = {}
499 byfile = {}
500 for entry in ledger.entries.itervalues():
500 for entry in ledger.entries.itervalues():
501 if entry.datasource:
501 if entry.datasource:
502 byfile.setdefault(entry.filename, {})[entry.node] = entry
502 byfile.setdefault(entry.filename, {})[entry.node] = entry
503
503
504 count = 0
504 count = 0
505 for filename, entries in sorted(byfile.iteritems()):
505 for filename, entries in sorted(byfile.iteritems()):
506 ui.progress(_("repacking data"), count, unit=self.unit,
506 ui.progress(_("repacking data"), count, unit=self.unit,
507 total=len(byfile))
507 total=len(byfile))
508
508
509 ancestors = {}
509 ancestors = {}
510 nodes = list(node for node in entries.iterkeys())
510 nodes = list(node for node in entries)
511 nohistory = []
511 nohistory = []
512 for i, node in enumerate(nodes):
512 for i, node in enumerate(nodes):
513 if node in ancestors:
513 if node in ancestors:
514 continue
514 continue
515 ui.progress(_("building history"), i, unit='nodes',
515 ui.progress(_("building history"), i, unit='nodes',
516 total=len(nodes))
516 total=len(nodes))
517 try:
517 try:
518 ancestors.update(self.fullhistory.getancestors(filename,
518 ancestors.update(self.fullhistory.getancestors(filename,
519 node, known=ancestors))
519 node, known=ancestors))
520 except KeyError:
520 except KeyError:
521 # Since we're packing data entries, we may not have the
521 # Since we're packing data entries, we may not have the
522 # corresponding history entries for them. It's not a big
522 # corresponding history entries for them. It's not a big
523 # deal, but the entries won't be delta'd perfectly.
523 # deal, but the entries won't be delta'd perfectly.
524 nohistory.append(node)
524 nohistory.append(node)
525 ui.progress(_("building history"), None)
525 ui.progress(_("building history"), None)
526
526
527 # Order the nodes children first, so we can produce reverse deltas
527 # Order the nodes children first, so we can produce reverse deltas
528 orderednodes = list(reversed(self._toposort(ancestors)))
528 orderednodes = list(reversed(self._toposort(ancestors)))
529 if len(nohistory) > 0:
529 if len(nohistory) > 0:
530 ui.debug('repackdata: %d nodes without history\n' %
530 ui.debug('repackdata: %d nodes without history\n' %
531 len(nohistory))
531 len(nohistory))
532 orderednodes.extend(sorted(nohistory))
532 orderednodes.extend(sorted(nohistory))
533
533
534 # Filter orderednodes to just the nodes we want to serialize (it
534 # Filter orderednodes to just the nodes we want to serialize (it
535 # currently also has the edge nodes' ancestors).
535 # currently also has the edge nodes' ancestors).
536 orderednodes = filter(lambda node: node in nodes, orderednodes)
536 orderednodes = filter(lambda node: node in nodes, orderednodes)
537
537
538 # Garbage collect old nodes:
538 # Garbage collect old nodes:
539 if self.garbagecollect:
539 if self.garbagecollect:
540 neworderednodes = []
540 neworderednodes = []
541 for node in orderednodes:
541 for node in orderednodes:
542 # If the node is old and is not in the keepset, we skip it,
542 # If the node is old and is not in the keepset, we skip it,
543 # and mark as garbage collected
543 # and mark as garbage collected
544 if ((filename, node) not in self.keepkeys and
544 if ((filename, node) not in self.keepkeys and
545 self.isold(self.repo, filename, node)):
545 self.isold(self.repo, filename, node)):
546 entries[node].gced = True
546 entries[node].gced = True
547 continue
547 continue
548 neworderednodes.append(node)
548 neworderednodes.append(node)
549 orderednodes = neworderednodes
549 orderednodes = neworderednodes
550
550
551 # Compute delta bases for nodes:
551 # Compute delta bases for nodes:
552 deltabases = {}
552 deltabases = {}
553 nobase = set()
553 nobase = set()
554 referenced = set()
554 referenced = set()
555 nodes = set(nodes)
555 nodes = set(nodes)
556 for i, node in enumerate(orderednodes):
556 for i, node in enumerate(orderednodes):
557 ui.progress(_("processing nodes"), i, unit='nodes',
557 ui.progress(_("processing nodes"), i, unit='nodes',
558 total=len(orderednodes))
558 total=len(orderednodes))
559 # Find delta base
559 # Find delta base
560 # TODO: allow delta'ing against most recent descendant instead
560 # TODO: allow delta'ing against most recent descendant instead
561 # of immediate child
561 # of immediate child
562 deltatuple = deltabases.get(node, None)
562 deltatuple = deltabases.get(node, None)
563 if deltatuple is None:
563 if deltatuple is None:
564 deltabase, chainlen = nullid, 0
564 deltabase, chainlen = nullid, 0
565 deltabases[node] = (nullid, 0)
565 deltabases[node] = (nullid, 0)
566 nobase.add(node)
566 nobase.add(node)
567 else:
567 else:
568 deltabase, chainlen = deltatuple
568 deltabase, chainlen = deltatuple
569 referenced.add(deltabase)
569 referenced.add(deltabase)
570
570
571 # Use available ancestor information to inform our delta choices
571 # Use available ancestor information to inform our delta choices
572 ancestorinfo = ancestors.get(node)
572 ancestorinfo = ancestors.get(node)
573 if ancestorinfo:
573 if ancestorinfo:
574 p1, p2, linknode, copyfrom = ancestorinfo
574 p1, p2, linknode, copyfrom = ancestorinfo
575
575
576 # The presence of copyfrom means we're at a point where the
576 # The presence of copyfrom means we're at a point where the
577 # file was copied from elsewhere. So don't attempt to do any
577 # file was copied from elsewhere. So don't attempt to do any
578 # deltas with the other file.
578 # deltas with the other file.
579 if copyfrom:
579 if copyfrom:
580 p1 = nullid
580 p1 = nullid
581
581
582 if chainlen < maxchainlen:
582 if chainlen < maxchainlen:
583 # Record this child as the delta base for its parents.
583 # Record this child as the delta base for its parents.
584 # This may be non optimal, since the parents may have
584 # This may be non optimal, since the parents may have
585 # many children, and this will only choose the last one.
585 # many children, and this will only choose the last one.
586 # TODO: record all children and try all deltas to find
586 # TODO: record all children and try all deltas to find
587 # best
587 # best
588 if p1 != nullid:
588 if p1 != nullid:
589 deltabases[p1] = (node, chainlen + 1)
589 deltabases[p1] = (node, chainlen + 1)
590 if p2 != nullid:
590 if p2 != nullid:
591 deltabases[p2] = (node, chainlen + 1)
591 deltabases[p2] = (node, chainlen + 1)
592
592
593 # experimental config: repack.chainorphansbysize
593 # experimental config: repack.chainorphansbysize
594 if ui.configbool('repack', 'chainorphansbysize'):
594 if ui.configbool('repack', 'chainorphansbysize'):
595 orphans = nobase - referenced
595 orphans = nobase - referenced
596 orderednodes = self._chainorphans(ui, filename, orderednodes,
596 orderednodes = self._chainorphans(ui, filename, orderednodes,
597 orphans, deltabases)
597 orphans, deltabases)
598
598
599 # Compute deltas and write to the pack
599 # Compute deltas and write to the pack
600 for i, node in enumerate(orderednodes):
600 for i, node in enumerate(orderednodes):
601 deltabase, chainlen = deltabases[node]
601 deltabase, chainlen = deltabases[node]
602 # Compute delta
602 # Compute delta
603 # TODO: Optimize the deltachain fetching. Since we're
603 # TODO: Optimize the deltachain fetching. Since we're
604 # iterating over the different version of the file, we may
604 # iterating over the different version of the file, we may
605 # be fetching the same deltachain over and over again.
605 # be fetching the same deltachain over and over again.
606 meta = None
606 meta = None
607 if deltabase != nullid:
607 if deltabase != nullid:
608 deltaentry = self.data.getdelta(filename, node)
608 deltaentry = self.data.getdelta(filename, node)
609 delta, deltabasename, origdeltabase, meta = deltaentry
609 delta, deltabasename, origdeltabase, meta = deltaentry
610 size = meta.get(constants.METAKEYSIZE)
610 size = meta.get(constants.METAKEYSIZE)
611 if (deltabasename != filename or origdeltabase != deltabase
611 if (deltabasename != filename or origdeltabase != deltabase
612 or size is None):
612 or size is None):
613 deltabasetext = self.data.get(filename, deltabase)
613 deltabasetext = self.data.get(filename, deltabase)
614 original = self.data.get(filename, node)
614 original = self.data.get(filename, node)
615 size = len(original)
615 size = len(original)
616 delta = mdiff.textdiff(deltabasetext, original)
616 delta = mdiff.textdiff(deltabasetext, original)
617 else:
617 else:
618 delta = self.data.get(filename, node)
618 delta = self.data.get(filename, node)
619 size = len(delta)
619 size = len(delta)
620 meta = self.data.getmeta(filename, node)
620 meta = self.data.getmeta(filename, node)
621
621
622 # TODO: don't use the delta if it's larger than the fulltext
622 # TODO: don't use the delta if it's larger than the fulltext
623 if constants.METAKEYSIZE not in meta:
623 if constants.METAKEYSIZE not in meta:
624 meta[constants.METAKEYSIZE] = size
624 meta[constants.METAKEYSIZE] = size
625 target.add(filename, node, deltabase, delta, meta)
625 target.add(filename, node, deltabase, delta, meta)
626
626
627 entries[node].datarepacked = True
627 entries[node].datarepacked = True
628
628
629 ui.progress(_("processing nodes"), None)
629 ui.progress(_("processing nodes"), None)
630 count += 1
630 count += 1
631
631
632 ui.progress(_("repacking data"), None)
632 ui.progress(_("repacking data"), None)
633 target.close(ledger=ledger)
633 target.close(ledger=ledger)
634
634
635 def repackhistory(self, ledger, target):
635 def repackhistory(self, ledger, target):
636 ui = self.repo.ui
636 ui = self.repo.ui
637
637
638 byfile = {}
638 byfile = {}
639 for entry in ledger.entries.itervalues():
639 for entry in ledger.entries.itervalues():
640 if entry.historysource:
640 if entry.historysource:
641 byfile.setdefault(entry.filename, {})[entry.node] = entry
641 byfile.setdefault(entry.filename, {})[entry.node] = entry
642
642
643 count = 0
643 count = 0
644 for filename, entries in sorted(byfile.iteritems()):
644 for filename, entries in sorted(byfile.iteritems()):
645 ancestors = {}
645 ancestors = {}
646 nodes = list(node for node in entries.iterkeys())
646 nodes = list(node for node in entries)
647
647
648 for node in nodes:
648 for node in nodes:
649 if node in ancestors:
649 if node in ancestors:
650 continue
650 continue
651 ancestors.update(self.history.getancestors(filename, node,
651 ancestors.update(self.history.getancestors(filename, node,
652 known=ancestors))
652 known=ancestors))
653
653
654 # Order the nodes children first
654 # Order the nodes children first
655 orderednodes = reversed(self._toposort(ancestors))
655 orderednodes = reversed(self._toposort(ancestors))
656
656
657 # Write to the pack
657 # Write to the pack
658 dontprocess = set()
658 dontprocess = set()
659 for node in orderednodes:
659 for node in orderednodes:
660 p1, p2, linknode, copyfrom = ancestors[node]
660 p1, p2, linknode, copyfrom = ancestors[node]
661
661
662 # If the node is marked dontprocess, but it's also in the
662 # If the node is marked dontprocess, but it's also in the
663 # explicit entries set, that means the node exists both in this
663 # explicit entries set, that means the node exists both in this
664 # file and in another file that was copied to this file.
664 # file and in another file that was copied to this file.
665 # Usually this happens if the file was copied to another file,
665 # Usually this happens if the file was copied to another file,
666 # then the copy was deleted, then reintroduced without copy
666 # then the copy was deleted, then reintroduced without copy
667 # metadata. The original add and the new add have the same hash
667 # metadata. The original add and the new add have the same hash
668 # since the content is identical and the parents are null.
668 # since the content is identical and the parents are null.
669 if node in dontprocess and node not in entries:
669 if node in dontprocess and node not in entries:
670 # If copyfrom == filename, it means the copy history
670 # If copyfrom == filename, it means the copy history
671 # went to come other file, then came back to this one, so we
671 # went to come other file, then came back to this one, so we
672 # should continue processing it.
672 # should continue processing it.
673 if p1 != nullid and copyfrom != filename:
673 if p1 != nullid and copyfrom != filename:
674 dontprocess.add(p1)
674 dontprocess.add(p1)
675 if p2 != nullid:
675 if p2 != nullid:
676 dontprocess.add(p2)
676 dontprocess.add(p2)
677 continue
677 continue
678
678
679 if copyfrom:
679 if copyfrom:
680 dontprocess.add(p1)
680 dontprocess.add(p1)
681
681
682 target.add(filename, node, p1, p2, linknode, copyfrom)
682 target.add(filename, node, p1, p2, linknode, copyfrom)
683
683
684 if node in entries:
684 if node in entries:
685 entries[node].historyrepacked = True
685 entries[node].historyrepacked = True
686
686
687 count += 1
687 count += 1
688 ui.progress(_("repacking history"), count, unit=self.unit,
688 ui.progress(_("repacking history"), count, unit=self.unit,
689 total=len(byfile))
689 total=len(byfile))
690
690
691 ui.progress(_("repacking history"), None)
691 ui.progress(_("repacking history"), None)
692 target.close(ledger=ledger)
692 target.close(ledger=ledger)
693
693
694 def _toposort(self, ancestors):
694 def _toposort(self, ancestors):
695 def parentfunc(node):
695 def parentfunc(node):
696 p1, p2, linknode, copyfrom = ancestors[node]
696 p1, p2, linknode, copyfrom = ancestors[node]
697 parents = []
697 parents = []
698 if p1 != nullid:
698 if p1 != nullid:
699 parents.append(p1)
699 parents.append(p1)
700 if p2 != nullid:
700 if p2 != nullid:
701 parents.append(p2)
701 parents.append(p2)
702 return parents
702 return parents
703
703
704 sortednodes = shallowutil.sortnodes(ancestors.keys(), parentfunc)
704 sortednodes = shallowutil.sortnodes(ancestors.keys(), parentfunc)
705 return sortednodes
705 return sortednodes
706
706
707 class repackledger(object):
707 class repackledger(object):
708 """Storage for all the bookkeeping that happens during a repack. It contains
708 """Storage for all the bookkeeping that happens during a repack. It contains
709 the list of revisions being repacked, what happened to each revision, and
709 the list of revisions being repacked, what happened to each revision, and
710 which source store contained which revision originally (for later cleanup).
710 which source store contained which revision originally (for later cleanup).
711 """
711 """
712 def __init__(self):
712 def __init__(self):
713 self.entries = {}
713 self.entries = {}
714 self.sources = {}
714 self.sources = {}
715 self.created = set()
715 self.created = set()
716
716
717 def markdataentry(self, source, filename, node):
717 def markdataentry(self, source, filename, node):
718 """Mark the given filename+node revision as having a data rev in the
718 """Mark the given filename+node revision as having a data rev in the
719 given source.
719 given source.
720 """
720 """
721 entry = self._getorcreateentry(filename, node)
721 entry = self._getorcreateentry(filename, node)
722 entry.datasource = True
722 entry.datasource = True
723 entries = self.sources.get(source)
723 entries = self.sources.get(source)
724 if not entries:
724 if not entries:
725 entries = set()
725 entries = set()
726 self.sources[source] = entries
726 self.sources[source] = entries
727 entries.add(entry)
727 entries.add(entry)
728
728
729 def markhistoryentry(self, source, filename, node):
729 def markhistoryentry(self, source, filename, node):
730 """Mark the given filename+node revision as having a history rev in the
730 """Mark the given filename+node revision as having a history rev in the
731 given source.
731 given source.
732 """
732 """
733 entry = self._getorcreateentry(filename, node)
733 entry = self._getorcreateentry(filename, node)
734 entry.historysource = True
734 entry.historysource = True
735 entries = self.sources.get(source)
735 entries = self.sources.get(source)
736 if not entries:
736 if not entries:
737 entries = set()
737 entries = set()
738 self.sources[source] = entries
738 self.sources[source] = entries
739 entries.add(entry)
739 entries.add(entry)
740
740
741 def _getorcreateentry(self, filename, node):
741 def _getorcreateentry(self, filename, node):
742 key = (filename, node)
742 key = (filename, node)
743 value = self.entries.get(key)
743 value = self.entries.get(key)
744 if not value:
744 if not value:
745 value = repackentry(filename, node)
745 value = repackentry(filename, node)
746 self.entries[key] = value
746 self.entries[key] = value
747
747
748 return value
748 return value
749
749
750 def addcreated(self, value):
750 def addcreated(self, value):
751 self.created.add(value)
751 self.created.add(value)
752
752
753 class repackentry(object):
753 class repackentry(object):
754 """Simple class representing a single revision entry in the repackledger.
754 """Simple class representing a single revision entry in the repackledger.
755 """
755 """
756 __slots__ = (r'filename', r'node', r'datasource', r'historysource',
756 __slots__ = (r'filename', r'node', r'datasource', r'historysource',
757 r'datarepacked', r'historyrepacked', r'gced')
757 r'datarepacked', r'historyrepacked', r'gced')
758 def __init__(self, filename, node):
758 def __init__(self, filename, node):
759 self.filename = filename
759 self.filename = filename
760 self.node = node
760 self.node = node
761 # If the revision has a data entry in the source
761 # If the revision has a data entry in the source
762 self.datasource = False
762 self.datasource = False
763 # If the revision has a history entry in the source
763 # If the revision has a history entry in the source
764 self.historysource = False
764 self.historysource = False
765 # If the revision's data entry was repacked into the repack target
765 # If the revision's data entry was repacked into the repack target
766 self.datarepacked = False
766 self.datarepacked = False
767 # If the revision's history entry was repacked into the repack target
767 # If the revision's history entry was repacked into the repack target
768 self.historyrepacked = False
768 self.historyrepacked = False
769 # If garbage collected
769 # If garbage collected
770 self.gced = False
770 self.gced = False
771
771
772 def repacklockvfs(repo):
772 def repacklockvfs(repo):
773 if util.safehasattr(repo, 'name'):
773 if util.safehasattr(repo, 'name'):
774 # Lock in the shared cache so repacks across multiple copies of the same
774 # Lock in the shared cache so repacks across multiple copies of the same
775 # repo are coordinated.
775 # repo are coordinated.
776 sharedcachepath = shallowutil.getcachepackpath(
776 sharedcachepath = shallowutil.getcachepackpath(
777 repo,
777 repo,
778 constants.FILEPACK_CATEGORY)
778 constants.FILEPACK_CATEGORY)
779 return vfs.vfs(sharedcachepath)
779 return vfs.vfs(sharedcachepath)
780 else:
780 else:
781 return repo.svfs
781 return repo.svfs
General Comments 0
You need to be logged in to leave comments. Login now