##// END OF EJS Templates
remotefilelog: prefetch files in deterministic order...
Martin von Zweigbergk -
r42203:0129bf02 default
parent child Browse files
Show More
@@ -1,581 +1,581 b''
1 # fileserverclient.py - client for communicating with the cache process
1 # fileserverclient.py - client for communicating with the cache process
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import io
11 import io
12 import os
12 import os
13 import threading
13 import threading
14 import time
14 import time
15 import zlib
15 import zlib
16
16
17 from mercurial.i18n import _
17 from mercurial.i18n import _
18 from mercurial.node import bin, hex, nullid
18 from mercurial.node import bin, hex, nullid
19 from mercurial import (
19 from mercurial import (
20 error,
20 error,
21 node,
21 node,
22 pycompat,
22 pycompat,
23 revlog,
23 revlog,
24 sshpeer,
24 sshpeer,
25 util,
25 util,
26 wireprotov1peer,
26 wireprotov1peer,
27 )
27 )
28 from mercurial.utils import procutil
28 from mercurial.utils import procutil
29
29
30 from . import (
30 from . import (
31 constants,
31 constants,
32 contentstore,
32 contentstore,
33 metadatastore,
33 metadatastore,
34 )
34 )
35
35
36 _sshv1peer = sshpeer.sshv1peer
36 _sshv1peer = sshpeer.sshv1peer
37
37
38 # Statistics for debugging
38 # Statistics for debugging
39 fetchcost = 0
39 fetchcost = 0
40 fetches = 0
40 fetches = 0
41 fetched = 0
41 fetched = 0
42 fetchmisses = 0
42 fetchmisses = 0
43
43
44 _lfsmod = None
44 _lfsmod = None
45
45
46 def getcachekey(reponame, file, id):
46 def getcachekey(reponame, file, id):
47 pathhash = node.hex(hashlib.sha1(file).digest())
47 pathhash = node.hex(hashlib.sha1(file).digest())
48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
49
49
50 def getlocalkey(file, id):
50 def getlocalkey(file, id):
51 pathhash = node.hex(hashlib.sha1(file).digest())
51 pathhash = node.hex(hashlib.sha1(file).digest())
52 return os.path.join(pathhash, id)
52 return os.path.join(pathhash, id)
53
53
54 def peersetup(ui, peer):
54 def peersetup(ui, peer):
55
55
56 class remotefilepeer(peer.__class__):
56 class remotefilepeer(peer.__class__):
57 @wireprotov1peer.batchable
57 @wireprotov1peer.batchable
58 def x_rfl_getfile(self, file, node):
58 def x_rfl_getfile(self, file, node):
59 if not self.capable('x_rfl_getfile'):
59 if not self.capable('x_rfl_getfile'):
60 raise error.Abort(
60 raise error.Abort(
61 'configured remotefile server does not support getfile')
61 'configured remotefile server does not support getfile')
62 f = wireprotov1peer.future()
62 f = wireprotov1peer.future()
63 yield {'file': file, 'node': node}, f
63 yield {'file': file, 'node': node}, f
64 code, data = f.value.split('\0', 1)
64 code, data = f.value.split('\0', 1)
65 if int(code):
65 if int(code):
66 raise error.LookupError(file, node, data)
66 raise error.LookupError(file, node, data)
67 yield data
67 yield data
68
68
69 @wireprotov1peer.batchable
69 @wireprotov1peer.batchable
70 def x_rfl_getflogheads(self, path):
70 def x_rfl_getflogheads(self, path):
71 if not self.capable('x_rfl_getflogheads'):
71 if not self.capable('x_rfl_getflogheads'):
72 raise error.Abort('configured remotefile server does not '
72 raise error.Abort('configured remotefile server does not '
73 'support getflogheads')
73 'support getflogheads')
74 f = wireprotov1peer.future()
74 f = wireprotov1peer.future()
75 yield {'path': path}, f
75 yield {'path': path}, f
76 heads = f.value.split('\n') if f.value else []
76 heads = f.value.split('\n') if f.value else []
77 yield heads
77 yield heads
78
78
79 def _updatecallstreamopts(self, command, opts):
79 def _updatecallstreamopts(self, command, opts):
80 if command != 'getbundle':
80 if command != 'getbundle':
81 return
81 return
82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
83 not in self.capabilities()):
83 not in self.capabilities()):
84 return
84 return
85 if not util.safehasattr(self, '_localrepo'):
85 if not util.safehasattr(self, '_localrepo'):
86 return
86 return
87 if (constants.SHALLOWREPO_REQUIREMENT
87 if (constants.SHALLOWREPO_REQUIREMENT
88 not in self._localrepo.requirements):
88 not in self._localrepo.requirements):
89 return
89 return
90
90
91 bundlecaps = opts.get('bundlecaps')
91 bundlecaps = opts.get('bundlecaps')
92 if bundlecaps:
92 if bundlecaps:
93 bundlecaps = [bundlecaps]
93 bundlecaps = [bundlecaps]
94 else:
94 else:
95 bundlecaps = []
95 bundlecaps = []
96
96
97 # shallow, includepattern, and excludepattern are a hacky way of
97 # shallow, includepattern, and excludepattern are a hacky way of
98 # carrying over data from the local repo to this getbundle
98 # carrying over data from the local repo to this getbundle
99 # command. We need to do it this way because bundle1 getbundle
99 # command. We need to do it this way because bundle1 getbundle
100 # doesn't provide any other place we can hook in to manipulate
100 # doesn't provide any other place we can hook in to manipulate
101 # getbundle args before it goes across the wire. Once we get rid
101 # getbundle args before it goes across the wire. Once we get rid
102 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
102 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
103 # do this more cleanly.
103 # do this more cleanly.
104 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
104 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
105 if self._localrepo.includepattern:
105 if self._localrepo.includepattern:
106 patterns = '\0'.join(self._localrepo.includepattern)
106 patterns = '\0'.join(self._localrepo.includepattern)
107 includecap = "includepattern=" + patterns
107 includecap = "includepattern=" + patterns
108 bundlecaps.append(includecap)
108 bundlecaps.append(includecap)
109 if self._localrepo.excludepattern:
109 if self._localrepo.excludepattern:
110 patterns = '\0'.join(self._localrepo.excludepattern)
110 patterns = '\0'.join(self._localrepo.excludepattern)
111 excludecap = "excludepattern=" + patterns
111 excludecap = "excludepattern=" + patterns
112 bundlecaps.append(excludecap)
112 bundlecaps.append(excludecap)
113 opts['bundlecaps'] = ','.join(bundlecaps)
113 opts['bundlecaps'] = ','.join(bundlecaps)
114
114
115 def _sendrequest(self, command, args, **opts):
115 def _sendrequest(self, command, args, **opts):
116 self._updatecallstreamopts(command, args)
116 self._updatecallstreamopts(command, args)
117 return super(remotefilepeer, self)._sendrequest(command, args,
117 return super(remotefilepeer, self)._sendrequest(command, args,
118 **opts)
118 **opts)
119
119
120 def _callstream(self, command, **opts):
120 def _callstream(self, command, **opts):
121 supertype = super(remotefilepeer, self)
121 supertype = super(remotefilepeer, self)
122 if not util.safehasattr(supertype, '_sendrequest'):
122 if not util.safehasattr(supertype, '_sendrequest'):
123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
124 return super(remotefilepeer, self)._callstream(command, **opts)
124 return super(remotefilepeer, self)._callstream(command, **opts)
125
125
126 peer.__class__ = remotefilepeer
126 peer.__class__ = remotefilepeer
127
127
128 class cacheconnection(object):
128 class cacheconnection(object):
129 """The connection for communicating with the remote cache. Performs
129 """The connection for communicating with the remote cache. Performs
130 gets and sets by communicating with an external process that has the
130 gets and sets by communicating with an external process that has the
131 cache-specific implementation.
131 cache-specific implementation.
132 """
132 """
133 def __init__(self):
133 def __init__(self):
134 self.pipeo = self.pipei = self.pipee = None
134 self.pipeo = self.pipei = self.pipee = None
135 self.subprocess = None
135 self.subprocess = None
136 self.connected = False
136 self.connected = False
137
137
138 def connect(self, cachecommand):
138 def connect(self, cachecommand):
139 if self.pipeo:
139 if self.pipeo:
140 raise error.Abort(_("cache connection already open"))
140 raise error.Abort(_("cache connection already open"))
141 self.pipei, self.pipeo, self.pipee, self.subprocess = (
141 self.pipei, self.pipeo, self.pipee, self.subprocess = (
142 procutil.popen4(cachecommand))
142 procutil.popen4(cachecommand))
143 self.connected = True
143 self.connected = True
144
144
145 def close(self):
145 def close(self):
146 def tryclose(pipe):
146 def tryclose(pipe):
147 try:
147 try:
148 pipe.close()
148 pipe.close()
149 except Exception:
149 except Exception:
150 pass
150 pass
151 if self.connected:
151 if self.connected:
152 try:
152 try:
153 self.pipei.write("exit\n")
153 self.pipei.write("exit\n")
154 except Exception:
154 except Exception:
155 pass
155 pass
156 tryclose(self.pipei)
156 tryclose(self.pipei)
157 self.pipei = None
157 self.pipei = None
158 tryclose(self.pipeo)
158 tryclose(self.pipeo)
159 self.pipeo = None
159 self.pipeo = None
160 tryclose(self.pipee)
160 tryclose(self.pipee)
161 self.pipee = None
161 self.pipee = None
162 try:
162 try:
163 # Wait for process to terminate, making sure to avoid deadlock.
163 # Wait for process to terminate, making sure to avoid deadlock.
164 # See https://docs.python.org/2/library/subprocess.html for
164 # See https://docs.python.org/2/library/subprocess.html for
165 # warnings about wait() and deadlocking.
165 # warnings about wait() and deadlocking.
166 self.subprocess.communicate()
166 self.subprocess.communicate()
167 except Exception:
167 except Exception:
168 pass
168 pass
169 self.subprocess = None
169 self.subprocess = None
170 self.connected = False
170 self.connected = False
171
171
172 def request(self, request, flush=True):
172 def request(self, request, flush=True):
173 if self.connected:
173 if self.connected:
174 try:
174 try:
175 self.pipei.write(request)
175 self.pipei.write(request)
176 if flush:
176 if flush:
177 self.pipei.flush()
177 self.pipei.flush()
178 except IOError:
178 except IOError:
179 self.close()
179 self.close()
180
180
181 def receiveline(self):
181 def receiveline(self):
182 if not self.connected:
182 if not self.connected:
183 return None
183 return None
184 try:
184 try:
185 result = self.pipeo.readline()[:-1]
185 result = self.pipeo.readline()[:-1]
186 if not result:
186 if not result:
187 self.close()
187 self.close()
188 except IOError:
188 except IOError:
189 self.close()
189 self.close()
190
190
191 return result
191 return result
192
192
193 def _getfilesbatch(
193 def _getfilesbatch(
194 remote, receivemissing, progresstick, missed, idmap, batchsize):
194 remote, receivemissing, progresstick, missed, idmap, batchsize):
195 # Over http(s), iterbatch is a streamy method and we can start
195 # Over http(s), iterbatch is a streamy method and we can start
196 # looking at results early. This means we send one (potentially
196 # looking at results early. This means we send one (potentially
197 # large) request, but then we show nice progress as we process
197 # large) request, but then we show nice progress as we process
198 # file results, rather than showing chunks of $batchsize in
198 # file results, rather than showing chunks of $batchsize in
199 # progress.
199 # progress.
200 #
200 #
201 # Over ssh, iterbatch isn't streamy because batch() wasn't
201 # Over ssh, iterbatch isn't streamy because batch() wasn't
202 # explicitly designed as a streaming method. In the future we
202 # explicitly designed as a streaming method. In the future we
203 # should probably introduce a streambatch() method upstream and
203 # should probably introduce a streambatch() method upstream and
204 # use that for this.
204 # use that for this.
205 with remote.commandexecutor() as e:
205 with remote.commandexecutor() as e:
206 futures = []
206 futures = []
207 for m in missed:
207 for m in missed:
208 futures.append(e.callcommand('x_rfl_getfile', {
208 futures.append(e.callcommand('x_rfl_getfile', {
209 'file': idmap[m],
209 'file': idmap[m],
210 'node': m[-40:]
210 'node': m[-40:]
211 }))
211 }))
212
212
213 for i, m in enumerate(missed):
213 for i, m in enumerate(missed):
214 r = futures[i].result()
214 r = futures[i].result()
215 futures[i] = None # release memory
215 futures[i] = None # release memory
216 file_ = idmap[m]
216 file_ = idmap[m]
217 node = m[-40:]
217 node = m[-40:]
218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
219 progresstick()
219 progresstick()
220
220
221 def _getfiles_optimistic(
221 def _getfiles_optimistic(
222 remote, receivemissing, progresstick, missed, idmap, step):
222 remote, receivemissing, progresstick, missed, idmap, step):
223 remote._callstream("x_rfl_getfiles")
223 remote._callstream("x_rfl_getfiles")
224 i = 0
224 i = 0
225 pipeo = remote._pipeo
225 pipeo = remote._pipeo
226 pipei = remote._pipei
226 pipei = remote._pipei
227 while i < len(missed):
227 while i < len(missed):
228 # issue a batch of requests
228 # issue a batch of requests
229 start = i
229 start = i
230 end = min(len(missed), start + step)
230 end = min(len(missed), start + step)
231 i = end
231 i = end
232 for missingid in missed[start:end]:
232 for missingid in missed[start:end]:
233 # issue new request
233 # issue new request
234 versionid = missingid[-40:]
234 versionid = missingid[-40:]
235 file = idmap[missingid]
235 file = idmap[missingid]
236 sshrequest = "%s%s\n" % (versionid, file)
236 sshrequest = "%s%s\n" % (versionid, file)
237 pipeo.write(sshrequest)
237 pipeo.write(sshrequest)
238 pipeo.flush()
238 pipeo.flush()
239
239
240 # receive batch results
240 # receive batch results
241 for missingid in missed[start:end]:
241 for missingid in missed[start:end]:
242 versionid = missingid[-40:]
242 versionid = missingid[-40:]
243 file = idmap[missingid]
243 file = idmap[missingid]
244 receivemissing(pipei, file, versionid)
244 receivemissing(pipei, file, versionid)
245 progresstick()
245 progresstick()
246
246
247 # End the command
247 # End the command
248 pipeo.write('\n')
248 pipeo.write('\n')
249 pipeo.flush()
249 pipeo.flush()
250
250
251 def _getfiles_threaded(
251 def _getfiles_threaded(
252 remote, receivemissing, progresstick, missed, idmap, step):
252 remote, receivemissing, progresstick, missed, idmap, step):
253 remote._callstream("getfiles")
253 remote._callstream("getfiles")
254 pipeo = remote._pipeo
254 pipeo = remote._pipeo
255 pipei = remote._pipei
255 pipei = remote._pipei
256
256
257 def writer():
257 def writer():
258 for missingid in missed:
258 for missingid in missed:
259 versionid = missingid[-40:]
259 versionid = missingid[-40:]
260 file = idmap[missingid]
260 file = idmap[missingid]
261 sshrequest = "%s%s\n" % (versionid, file)
261 sshrequest = "%s%s\n" % (versionid, file)
262 pipeo.write(sshrequest)
262 pipeo.write(sshrequest)
263 pipeo.flush()
263 pipeo.flush()
264 writerthread = threading.Thread(target=writer)
264 writerthread = threading.Thread(target=writer)
265 writerthread.daemon = True
265 writerthread.daemon = True
266 writerthread.start()
266 writerthread.start()
267
267
268 for missingid in missed:
268 for missingid in missed:
269 versionid = missingid[-40:]
269 versionid = missingid[-40:]
270 file = idmap[missingid]
270 file = idmap[missingid]
271 receivemissing(pipei, file, versionid)
271 receivemissing(pipei, file, versionid)
272 progresstick()
272 progresstick()
273
273
274 writerthread.join()
274 writerthread.join()
275 # End the command
275 # End the command
276 pipeo.write('\n')
276 pipeo.write('\n')
277 pipeo.flush()
277 pipeo.flush()
278
278
279 class fileserverclient(object):
279 class fileserverclient(object):
280 """A client for requesting files from the remote file server.
280 """A client for requesting files from the remote file server.
281 """
281 """
282 def __init__(self, repo):
282 def __init__(self, repo):
283 ui = repo.ui
283 ui = repo.ui
284 self.repo = repo
284 self.repo = repo
285 self.ui = ui
285 self.ui = ui
286 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
286 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
287 if self.cacheprocess:
287 if self.cacheprocess:
288 self.cacheprocess = util.expandpath(self.cacheprocess)
288 self.cacheprocess = util.expandpath(self.cacheprocess)
289
289
290 # This option causes remotefilelog to pass the full file path to the
290 # This option causes remotefilelog to pass the full file path to the
291 # cacheprocess instead of a hashed key.
291 # cacheprocess instead of a hashed key.
292 self.cacheprocesspasspath = ui.configbool(
292 self.cacheprocesspasspath = ui.configbool(
293 "remotefilelog", "cacheprocess.includepath")
293 "remotefilelog", "cacheprocess.includepath")
294
294
295 self.debugoutput = ui.configbool("remotefilelog", "debug")
295 self.debugoutput = ui.configbool("remotefilelog", "debug")
296
296
297 self.remotecache = cacheconnection()
297 self.remotecache = cacheconnection()
298
298
299 def setstore(self, datastore, historystore, writedata, writehistory):
299 def setstore(self, datastore, historystore, writedata, writehistory):
300 self.datastore = datastore
300 self.datastore = datastore
301 self.historystore = historystore
301 self.historystore = historystore
302 self.writedata = writedata
302 self.writedata = writedata
303 self.writehistory = writehistory
303 self.writehistory = writehistory
304
304
305 def _connect(self):
305 def _connect(self):
306 return self.repo.connectionpool.get(self.repo.fallbackpath)
306 return self.repo.connectionpool.get(self.repo.fallbackpath)
307
307
308 def request(self, fileids):
308 def request(self, fileids):
309 """Takes a list of filename/node pairs and fetches them from the
309 """Takes a list of filename/node pairs and fetches them from the
310 server. Files are stored in the local cache.
310 server. Files are stored in the local cache.
311 A list of nodes that the server couldn't find is returned.
311 A list of nodes that the server couldn't find is returned.
312 If the connection fails, an exception is raised.
312 If the connection fails, an exception is raised.
313 """
313 """
314 if not self.remotecache.connected:
314 if not self.remotecache.connected:
315 self.connect()
315 self.connect()
316 cache = self.remotecache
316 cache = self.remotecache
317 writedata = self.writedata
317 writedata = self.writedata
318
318
319 repo = self.repo
319 repo = self.repo
320 total = len(fileids)
320 total = len(fileids)
321 request = "get\n%d\n" % total
321 request = "get\n%d\n" % total
322 idmap = {}
322 idmap = {}
323 reponame = repo.name
323 reponame = repo.name
324 for file, id in fileids:
324 for file, id in fileids:
325 fullid = getcachekey(reponame, file, id)
325 fullid = getcachekey(reponame, file, id)
326 if self.cacheprocesspasspath:
326 if self.cacheprocesspasspath:
327 request += file + '\0'
327 request += file + '\0'
328 request += fullid + "\n"
328 request += fullid + "\n"
329 idmap[fullid] = file
329 idmap[fullid] = file
330
330
331 cache.request(request)
331 cache.request(request)
332
332
333 progress = self.ui.makeprogress(_('downloading'), total=total)
333 progress = self.ui.makeprogress(_('downloading'), total=total)
334 progress.update(0)
334 progress.update(0)
335
335
336 missed = []
336 missed = []
337 while True:
337 while True:
338 missingid = cache.receiveline()
338 missingid = cache.receiveline()
339 if not missingid:
339 if not missingid:
340 missedset = set(missed)
340 missedset = set(missed)
341 for missingid in idmap:
341 for missingid in idmap:
342 if not missingid in missedset:
342 if not missingid in missedset:
343 missed.append(missingid)
343 missed.append(missingid)
344 self.ui.warn(_("warning: cache connection closed early - " +
344 self.ui.warn(_("warning: cache connection closed early - " +
345 "falling back to server\n"))
345 "falling back to server\n"))
346 break
346 break
347 if missingid == "0":
347 if missingid == "0":
348 break
348 break
349 if missingid.startswith("_hits_"):
349 if missingid.startswith("_hits_"):
350 # receive progress reports
350 # receive progress reports
351 parts = missingid.split("_")
351 parts = missingid.split("_")
352 progress.increment(int(parts[2]))
352 progress.increment(int(parts[2]))
353 continue
353 continue
354
354
355 missed.append(missingid)
355 missed.append(missingid)
356
356
357 global fetchmisses
357 global fetchmisses
358 fetchmisses += len(missed)
358 fetchmisses += len(missed)
359
359
360 fromcache = total - len(missed)
360 fromcache = total - len(missed)
361 progress.update(fromcache, total=total)
361 progress.update(fromcache, total=total)
362 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
362 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
363 fromcache, total, hit=fromcache, total=total)
363 fromcache, total, hit=fromcache, total=total)
364
364
365 oldumask = os.umask(0o002)
365 oldumask = os.umask(0o002)
366 try:
366 try:
367 # receive cache misses from master
367 # receive cache misses from master
368 if missed:
368 if missed:
369 # When verbose is true, sshpeer prints 'running ssh...'
369 # When verbose is true, sshpeer prints 'running ssh...'
370 # to stdout, which can interfere with some command
370 # to stdout, which can interfere with some command
371 # outputs
371 # outputs
372 verbose = self.ui.verbose
372 verbose = self.ui.verbose
373 self.ui.verbose = False
373 self.ui.verbose = False
374 try:
374 try:
375 with self._connect() as conn:
375 with self._connect() as conn:
376 remote = conn.peer
376 remote = conn.peer
377 if remote.capable(
377 if remote.capable(
378 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
378 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
379 if not isinstance(remote, _sshv1peer):
379 if not isinstance(remote, _sshv1peer):
380 raise error.Abort('remotefilelog requires ssh '
380 raise error.Abort('remotefilelog requires ssh '
381 'servers')
381 'servers')
382 step = self.ui.configint('remotefilelog',
382 step = self.ui.configint('remotefilelog',
383 'getfilesstep')
383 'getfilesstep')
384 getfilestype = self.ui.config('remotefilelog',
384 getfilestype = self.ui.config('remotefilelog',
385 'getfilestype')
385 'getfilestype')
386 if getfilestype == 'threaded':
386 if getfilestype == 'threaded':
387 _getfiles = _getfiles_threaded
387 _getfiles = _getfiles_threaded
388 else:
388 else:
389 _getfiles = _getfiles_optimistic
389 _getfiles = _getfiles_optimistic
390 _getfiles(remote, self.receivemissing,
390 _getfiles(remote, self.receivemissing,
391 progress.increment, missed, idmap, step)
391 progress.increment, missed, idmap, step)
392 elif remote.capable("x_rfl_getfile"):
392 elif remote.capable("x_rfl_getfile"):
393 if remote.capable('batch'):
393 if remote.capable('batch'):
394 batchdefault = 100
394 batchdefault = 100
395 else:
395 else:
396 batchdefault = 10
396 batchdefault = 10
397 batchsize = self.ui.configint(
397 batchsize = self.ui.configint(
398 'remotefilelog', 'batchsize', batchdefault)
398 'remotefilelog', 'batchsize', batchdefault)
399 _getfilesbatch(
399 _getfilesbatch(
400 remote, self.receivemissing, progress.increment,
400 remote, self.receivemissing, progress.increment,
401 missed, idmap, batchsize)
401 missed, idmap, batchsize)
402 else:
402 else:
403 raise error.Abort("configured remotefilelog server"
403 raise error.Abort("configured remotefilelog server"
404 " does not support remotefilelog")
404 " does not support remotefilelog")
405
405
406 self.ui.log("remotefilefetchlog",
406 self.ui.log("remotefilefetchlog",
407 "Success\n",
407 "Success\n",
408 fetched_files = progress.pos - fromcache,
408 fetched_files = progress.pos - fromcache,
409 total_to_fetch = total - fromcache)
409 total_to_fetch = total - fromcache)
410 except Exception:
410 except Exception:
411 self.ui.log("remotefilefetchlog",
411 self.ui.log("remotefilefetchlog",
412 "Fail\n",
412 "Fail\n",
413 fetched_files = progress.pos - fromcache,
413 fetched_files = progress.pos - fromcache,
414 total_to_fetch = total - fromcache)
414 total_to_fetch = total - fromcache)
415 raise
415 raise
416 finally:
416 finally:
417 self.ui.verbose = verbose
417 self.ui.verbose = verbose
418 # send to memcache
418 # send to memcache
419 request = "set\n%d\n%s\n" % (len(missed), "\n".join(missed))
419 request = "set\n%d\n%s\n" % (len(missed), "\n".join(missed))
420 cache.request(request)
420 cache.request(request)
421
421
422 progress.complete()
422 progress.complete()
423
423
424 # mark ourselves as a user of this cache
424 # mark ourselves as a user of this cache
425 writedata.markrepo(self.repo.path)
425 writedata.markrepo(self.repo.path)
426 finally:
426 finally:
427 os.umask(oldumask)
427 os.umask(oldumask)
428
428
429 def receivemissing(self, pipe, filename, node):
429 def receivemissing(self, pipe, filename, node):
430 line = pipe.readline()[:-1]
430 line = pipe.readline()[:-1]
431 if not line:
431 if not line:
432 raise error.ResponseError(_("error downloading file contents:"),
432 raise error.ResponseError(_("error downloading file contents:"),
433 _("connection closed early"))
433 _("connection closed early"))
434 size = int(line)
434 size = int(line)
435 data = pipe.read(size)
435 data = pipe.read(size)
436 if len(data) != size:
436 if len(data) != size:
437 raise error.ResponseError(_("error downloading file contents:"),
437 raise error.ResponseError(_("error downloading file contents:"),
438 _("only received %s of %s bytes")
438 _("only received %s of %s bytes")
439 % (len(data), size))
439 % (len(data), size))
440
440
441 self.writedata.addremotefilelognode(filename, bin(node),
441 self.writedata.addremotefilelognode(filename, bin(node),
442 zlib.decompress(data))
442 zlib.decompress(data))
443
443
444 def connect(self):
444 def connect(self):
445 if self.cacheprocess:
445 if self.cacheprocess:
446 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
446 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
447 self.remotecache.connect(cmd)
447 self.remotecache.connect(cmd)
448 else:
448 else:
449 # If no cache process is specified, we fake one that always
449 # If no cache process is specified, we fake one that always
450 # returns cache misses. This enables tests to run easily
450 # returns cache misses. This enables tests to run easily
451 # and may eventually allow us to be a drop in replacement
451 # and may eventually allow us to be a drop in replacement
452 # for the largefiles extension.
452 # for the largefiles extension.
453 class simplecache(object):
453 class simplecache(object):
454 def __init__(self):
454 def __init__(self):
455 self.missingids = []
455 self.missingids = []
456 self.connected = True
456 self.connected = True
457
457
458 def close(self):
458 def close(self):
459 pass
459 pass
460
460
461 def request(self, value, flush=True):
461 def request(self, value, flush=True):
462 lines = value.split("\n")
462 lines = value.split("\n")
463 if lines[0] != "get":
463 if lines[0] != "get":
464 return
464 return
465 self.missingids = lines[2:-1]
465 self.missingids = lines[2:-1]
466 self.missingids.append('0')
466 self.missingids.append('0')
467
467
468 def receiveline(self):
468 def receiveline(self):
469 if len(self.missingids) > 0:
469 if len(self.missingids) > 0:
470 return self.missingids.pop(0)
470 return self.missingids.pop(0)
471 return None
471 return None
472
472
473 self.remotecache = simplecache()
473 self.remotecache = simplecache()
474
474
475 def close(self):
475 def close(self):
476 if fetches:
476 if fetches:
477 msg = ("%d files fetched over %d fetches - " +
477 msg = ("%d files fetched over %d fetches - " +
478 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
478 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
479 fetched,
479 fetched,
480 fetches,
480 fetches,
481 fetchmisses,
481 fetchmisses,
482 float(fetched - fetchmisses) / float(fetched) * 100.0,
482 float(fetched - fetchmisses) / float(fetched) * 100.0,
483 fetchcost)
483 fetchcost)
484 if self.debugoutput:
484 if self.debugoutput:
485 self.ui.warn(msg)
485 self.ui.warn(msg)
486 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
486 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
487 remotefilelogfetched=fetched,
487 remotefilelogfetched=fetched,
488 remotefilelogfetches=fetches,
488 remotefilelogfetches=fetches,
489 remotefilelogfetchmisses=fetchmisses,
489 remotefilelogfetchmisses=fetchmisses,
490 remotefilelogfetchtime=fetchcost * 1000)
490 remotefilelogfetchtime=fetchcost * 1000)
491
491
492 if self.remotecache.connected:
492 if self.remotecache.connected:
493 self.remotecache.close()
493 self.remotecache.close()
494
494
495 def prefetch(self, fileids, force=False, fetchdata=True,
495 def prefetch(self, fileids, force=False, fetchdata=True,
496 fetchhistory=False):
496 fetchhistory=False):
497 """downloads the given file versions to the cache
497 """downloads the given file versions to the cache
498 """
498 """
499 repo = self.repo
499 repo = self.repo
500 idstocheck = []
500 idstocheck = []
501 for file, id in fileids:
501 for file, id in fileids:
502 # hack
502 # hack
503 # - we don't use .hgtags
503 # - we don't use .hgtags
504 # - workingctx produces ids with length 42,
504 # - workingctx produces ids with length 42,
505 # which we skip since they aren't in any cache
505 # which we skip since they aren't in any cache
506 if (file == '.hgtags' or len(id) == 42
506 if (file == '.hgtags' or len(id) == 42
507 or not repo.shallowmatch(file)):
507 or not repo.shallowmatch(file)):
508 continue
508 continue
509
509
510 idstocheck.append((file, bin(id)))
510 idstocheck.append((file, bin(id)))
511
511
512 datastore = self.datastore
512 datastore = self.datastore
513 historystore = self.historystore
513 historystore = self.historystore
514 if force:
514 if force:
515 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
515 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
516 historystore = metadatastore.unionmetadatastore(
516 historystore = metadatastore.unionmetadatastore(
517 *repo.sharedhistorystores)
517 *repo.sharedhistorystores)
518
518
519 missingids = set()
519 missingids = set()
520 if fetchdata:
520 if fetchdata:
521 missingids.update(datastore.getmissing(idstocheck))
521 missingids.update(datastore.getmissing(idstocheck))
522 if fetchhistory:
522 if fetchhistory:
523 missingids.update(historystore.getmissing(idstocheck))
523 missingids.update(historystore.getmissing(idstocheck))
524
524
525 # partition missing nodes into nullid and not-nullid so we can
525 # partition missing nodes into nullid and not-nullid so we can
526 # warn about this filtering potentially shadowing bugs.
526 # warn about this filtering potentially shadowing bugs.
527 nullids = len([None for unused, id in missingids if id == nullid])
527 nullids = len([None for unused, id in missingids if id == nullid])
528 if nullids:
528 if nullids:
529 missingids = [(f, id) for f, id in missingids if id != nullid]
529 missingids = [(f, id) for f, id in missingids if id != nullid]
530 repo.ui.develwarn(
530 repo.ui.develwarn(
531 ('remotefilelog not fetching %d null revs'
531 ('remotefilelog not fetching %d null revs'
532 ' - this is likely hiding bugs' % nullids),
532 ' - this is likely hiding bugs' % nullids),
533 config='remotefilelog-ext')
533 config='remotefilelog-ext')
534 if missingids:
534 if missingids:
535 global fetches, fetched, fetchcost
535 global fetches, fetched, fetchcost
536 fetches += 1
536 fetches += 1
537
537
538 # We want to be able to detect excess individual file downloads, so
538 # We want to be able to detect excess individual file downloads, so
539 # let's log that information for debugging.
539 # let's log that information for debugging.
540 if fetches >= 15 and fetches < 18:
540 if fetches >= 15 and fetches < 18:
541 if fetches == 15:
541 if fetches == 15:
542 fetchwarning = self.ui.config('remotefilelog',
542 fetchwarning = self.ui.config('remotefilelog',
543 'fetchwarning')
543 'fetchwarning')
544 if fetchwarning:
544 if fetchwarning:
545 self.ui.warn(fetchwarning + '\n')
545 self.ui.warn(fetchwarning + '\n')
546 self.logstacktrace()
546 self.logstacktrace()
547 missingids = [(file, hex(id)) for file, id in missingids]
547 missingids = [(file, hex(id)) for file, id in sorted(missingids)]
548 fetched += len(missingids)
548 fetched += len(missingids)
549 start = time.time()
549 start = time.time()
550 missingids = self.request(missingids)
550 missingids = self.request(missingids)
551 if missingids:
551 if missingids:
552 raise error.Abort(_("unable to download %d files") %
552 raise error.Abort(_("unable to download %d files") %
553 len(missingids))
553 len(missingids))
554 fetchcost += time.time() - start
554 fetchcost += time.time() - start
555 self._lfsprefetch(fileids)
555 self._lfsprefetch(fileids)
556
556
557 def _lfsprefetch(self, fileids):
557 def _lfsprefetch(self, fileids):
558 if not _lfsmod or not util.safehasattr(
558 if not _lfsmod or not util.safehasattr(
559 self.repo.svfs, 'lfslocalblobstore'):
559 self.repo.svfs, 'lfslocalblobstore'):
560 return
560 return
561 if not _lfsmod.wrapper.candownload(self.repo):
561 if not _lfsmod.wrapper.candownload(self.repo):
562 return
562 return
563 pointers = []
563 pointers = []
564 store = self.repo.svfs.lfslocalblobstore
564 store = self.repo.svfs.lfslocalblobstore
565 for file, id in fileids:
565 for file, id in fileids:
566 node = bin(id)
566 node = bin(id)
567 rlog = self.repo.file(file)
567 rlog = self.repo.file(file)
568 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
568 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
569 text = rlog.revision(node, raw=True)
569 text = rlog.revision(node, raw=True)
570 p = _lfsmod.pointer.deserialize(text)
570 p = _lfsmod.pointer.deserialize(text)
571 oid = p.oid()
571 oid = p.oid()
572 if not store.has(oid):
572 if not store.has(oid):
573 pointers.append(p)
573 pointers.append(p)
574 if len(pointers) > 0:
574 if len(pointers) > 0:
575 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
575 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
576 assert all(store.has(p.oid()) for p in pointers)
576 assert all(store.has(p.oid()) for p in pointers)
577
577
578 def logstacktrace(self):
578 def logstacktrace(self):
579 import traceback
579 import traceback
580 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
580 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
581 ''.join(traceback.format_stack()))
581 ''.join(traceback.format_stack()))
General Comments 0
You need to be logged in to leave comments. Login now