##// END OF EJS Templates
remotefilelog: log when we're about to fetch files...
Augie Fackler -
r42451:65f3a772 default
parent child Browse files
Show More
@@ -1,581 +1,584 b''
1 # fileserverclient.py - client for communicating with the cache process
1 # fileserverclient.py - client for communicating with the cache process
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import io
11 import io
12 import os
12 import os
13 import threading
13 import threading
14 import time
14 import time
15 import zlib
15 import zlib
16
16
17 from mercurial.i18n import _
17 from mercurial.i18n import _
18 from mercurial.node import bin, hex, nullid
18 from mercurial.node import bin, hex, nullid
19 from mercurial import (
19 from mercurial import (
20 error,
20 error,
21 node,
21 node,
22 pycompat,
22 pycompat,
23 revlog,
23 revlog,
24 sshpeer,
24 sshpeer,
25 util,
25 util,
26 wireprotov1peer,
26 wireprotov1peer,
27 )
27 )
28 from mercurial.utils import procutil
28 from mercurial.utils import procutil
29
29
30 from . import (
30 from . import (
31 constants,
31 constants,
32 contentstore,
32 contentstore,
33 metadatastore,
33 metadatastore,
34 )
34 )
35
35
36 _sshv1peer = sshpeer.sshv1peer
36 _sshv1peer = sshpeer.sshv1peer
37
37
38 # Statistics for debugging
38 # Statistics for debugging
39 fetchcost = 0
39 fetchcost = 0
40 fetches = 0
40 fetches = 0
41 fetched = 0
41 fetched = 0
42 fetchmisses = 0
42 fetchmisses = 0
43
43
44 _lfsmod = None
44 _lfsmod = None
45
45
46 def getcachekey(reponame, file, id):
46 def getcachekey(reponame, file, id):
47 pathhash = node.hex(hashlib.sha1(file).digest())
47 pathhash = node.hex(hashlib.sha1(file).digest())
48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
49
49
50 def getlocalkey(file, id):
50 def getlocalkey(file, id):
51 pathhash = node.hex(hashlib.sha1(file).digest())
51 pathhash = node.hex(hashlib.sha1(file).digest())
52 return os.path.join(pathhash, id)
52 return os.path.join(pathhash, id)
53
53
54 def peersetup(ui, peer):
54 def peersetup(ui, peer):
55
55
56 class remotefilepeer(peer.__class__):
56 class remotefilepeer(peer.__class__):
57 @wireprotov1peer.batchable
57 @wireprotov1peer.batchable
58 def x_rfl_getfile(self, file, node):
58 def x_rfl_getfile(self, file, node):
59 if not self.capable('x_rfl_getfile'):
59 if not self.capable('x_rfl_getfile'):
60 raise error.Abort(
60 raise error.Abort(
61 'configured remotefile server does not support getfile')
61 'configured remotefile server does not support getfile')
62 f = wireprotov1peer.future()
62 f = wireprotov1peer.future()
63 yield {'file': file, 'node': node}, f
63 yield {'file': file, 'node': node}, f
64 code, data = f.value.split('\0', 1)
64 code, data = f.value.split('\0', 1)
65 if int(code):
65 if int(code):
66 raise error.LookupError(file, node, data)
66 raise error.LookupError(file, node, data)
67 yield data
67 yield data
68
68
69 @wireprotov1peer.batchable
69 @wireprotov1peer.batchable
70 def x_rfl_getflogheads(self, path):
70 def x_rfl_getflogheads(self, path):
71 if not self.capable('x_rfl_getflogheads'):
71 if not self.capable('x_rfl_getflogheads'):
72 raise error.Abort('configured remotefile server does not '
72 raise error.Abort('configured remotefile server does not '
73 'support getflogheads')
73 'support getflogheads')
74 f = wireprotov1peer.future()
74 f = wireprotov1peer.future()
75 yield {'path': path}, f
75 yield {'path': path}, f
76 heads = f.value.split('\n') if f.value else []
76 heads = f.value.split('\n') if f.value else []
77 yield heads
77 yield heads
78
78
79 def _updatecallstreamopts(self, command, opts):
79 def _updatecallstreamopts(self, command, opts):
80 if command != 'getbundle':
80 if command != 'getbundle':
81 return
81 return
82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
83 not in self.capabilities()):
83 not in self.capabilities()):
84 return
84 return
85 if not util.safehasattr(self, '_localrepo'):
85 if not util.safehasattr(self, '_localrepo'):
86 return
86 return
87 if (constants.SHALLOWREPO_REQUIREMENT
87 if (constants.SHALLOWREPO_REQUIREMENT
88 not in self._localrepo.requirements):
88 not in self._localrepo.requirements):
89 return
89 return
90
90
91 bundlecaps = opts.get('bundlecaps')
91 bundlecaps = opts.get('bundlecaps')
92 if bundlecaps:
92 if bundlecaps:
93 bundlecaps = [bundlecaps]
93 bundlecaps = [bundlecaps]
94 else:
94 else:
95 bundlecaps = []
95 bundlecaps = []
96
96
97 # shallow, includepattern, and excludepattern are a hacky way of
97 # shallow, includepattern, and excludepattern are a hacky way of
98 # carrying over data from the local repo to this getbundle
98 # carrying over data from the local repo to this getbundle
99 # command. We need to do it this way because bundle1 getbundle
99 # command. We need to do it this way because bundle1 getbundle
100 # doesn't provide any other place we can hook in to manipulate
100 # doesn't provide any other place we can hook in to manipulate
101 # getbundle args before it goes across the wire. Once we get rid
101 # getbundle args before it goes across the wire. Once we get rid
102 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
102 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
103 # do this more cleanly.
103 # do this more cleanly.
104 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
104 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
105 if self._localrepo.includepattern:
105 if self._localrepo.includepattern:
106 patterns = '\0'.join(self._localrepo.includepattern)
106 patterns = '\0'.join(self._localrepo.includepattern)
107 includecap = "includepattern=" + patterns
107 includecap = "includepattern=" + patterns
108 bundlecaps.append(includecap)
108 bundlecaps.append(includecap)
109 if self._localrepo.excludepattern:
109 if self._localrepo.excludepattern:
110 patterns = '\0'.join(self._localrepo.excludepattern)
110 patterns = '\0'.join(self._localrepo.excludepattern)
111 excludecap = "excludepattern=" + patterns
111 excludecap = "excludepattern=" + patterns
112 bundlecaps.append(excludecap)
112 bundlecaps.append(excludecap)
113 opts['bundlecaps'] = ','.join(bundlecaps)
113 opts['bundlecaps'] = ','.join(bundlecaps)
114
114
115 def _sendrequest(self, command, args, **opts):
115 def _sendrequest(self, command, args, **opts):
116 self._updatecallstreamopts(command, args)
116 self._updatecallstreamopts(command, args)
117 return super(remotefilepeer, self)._sendrequest(command, args,
117 return super(remotefilepeer, self)._sendrequest(command, args,
118 **opts)
118 **opts)
119
119
120 def _callstream(self, command, **opts):
120 def _callstream(self, command, **opts):
121 supertype = super(remotefilepeer, self)
121 supertype = super(remotefilepeer, self)
122 if not util.safehasattr(supertype, '_sendrequest'):
122 if not util.safehasattr(supertype, '_sendrequest'):
123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
124 return super(remotefilepeer, self)._callstream(command, **opts)
124 return super(remotefilepeer, self)._callstream(command, **opts)
125
125
126 peer.__class__ = remotefilepeer
126 peer.__class__ = remotefilepeer
127
127
128 class cacheconnection(object):
128 class cacheconnection(object):
129 """The connection for communicating with the remote cache. Performs
129 """The connection for communicating with the remote cache. Performs
130 gets and sets by communicating with an external process that has the
130 gets and sets by communicating with an external process that has the
131 cache-specific implementation.
131 cache-specific implementation.
132 """
132 """
133 def __init__(self):
133 def __init__(self):
134 self.pipeo = self.pipei = self.pipee = None
134 self.pipeo = self.pipei = self.pipee = None
135 self.subprocess = None
135 self.subprocess = None
136 self.connected = False
136 self.connected = False
137
137
138 def connect(self, cachecommand):
138 def connect(self, cachecommand):
139 if self.pipeo:
139 if self.pipeo:
140 raise error.Abort(_("cache connection already open"))
140 raise error.Abort(_("cache connection already open"))
141 self.pipei, self.pipeo, self.pipee, self.subprocess = (
141 self.pipei, self.pipeo, self.pipee, self.subprocess = (
142 procutil.popen4(cachecommand))
142 procutil.popen4(cachecommand))
143 self.connected = True
143 self.connected = True
144
144
145 def close(self):
145 def close(self):
146 def tryclose(pipe):
146 def tryclose(pipe):
147 try:
147 try:
148 pipe.close()
148 pipe.close()
149 except Exception:
149 except Exception:
150 pass
150 pass
151 if self.connected:
151 if self.connected:
152 try:
152 try:
153 self.pipei.write("exit\n")
153 self.pipei.write("exit\n")
154 except Exception:
154 except Exception:
155 pass
155 pass
156 tryclose(self.pipei)
156 tryclose(self.pipei)
157 self.pipei = None
157 self.pipei = None
158 tryclose(self.pipeo)
158 tryclose(self.pipeo)
159 self.pipeo = None
159 self.pipeo = None
160 tryclose(self.pipee)
160 tryclose(self.pipee)
161 self.pipee = None
161 self.pipee = None
162 try:
162 try:
163 # Wait for process to terminate, making sure to avoid deadlock.
163 # Wait for process to terminate, making sure to avoid deadlock.
164 # See https://docs.python.org/2/library/subprocess.html for
164 # See https://docs.python.org/2/library/subprocess.html for
165 # warnings about wait() and deadlocking.
165 # warnings about wait() and deadlocking.
166 self.subprocess.communicate()
166 self.subprocess.communicate()
167 except Exception:
167 except Exception:
168 pass
168 pass
169 self.subprocess = None
169 self.subprocess = None
170 self.connected = False
170 self.connected = False
171
171
172 def request(self, request, flush=True):
172 def request(self, request, flush=True):
173 if self.connected:
173 if self.connected:
174 try:
174 try:
175 self.pipei.write(request)
175 self.pipei.write(request)
176 if flush:
176 if flush:
177 self.pipei.flush()
177 self.pipei.flush()
178 except IOError:
178 except IOError:
179 self.close()
179 self.close()
180
180
181 def receiveline(self):
181 def receiveline(self):
182 if not self.connected:
182 if not self.connected:
183 return None
183 return None
184 try:
184 try:
185 result = self.pipeo.readline()[:-1]
185 result = self.pipeo.readline()[:-1]
186 if not result:
186 if not result:
187 self.close()
187 self.close()
188 except IOError:
188 except IOError:
189 self.close()
189 self.close()
190
190
191 return result
191 return result
192
192
193 def _getfilesbatch(
193 def _getfilesbatch(
194 remote, receivemissing, progresstick, missed, idmap, batchsize):
194 remote, receivemissing, progresstick, missed, idmap, batchsize):
195 # Over http(s), iterbatch is a streamy method and we can start
195 # Over http(s), iterbatch is a streamy method and we can start
196 # looking at results early. This means we send one (potentially
196 # looking at results early. This means we send one (potentially
197 # large) request, but then we show nice progress as we process
197 # large) request, but then we show nice progress as we process
198 # file results, rather than showing chunks of $batchsize in
198 # file results, rather than showing chunks of $batchsize in
199 # progress.
199 # progress.
200 #
200 #
201 # Over ssh, iterbatch isn't streamy because batch() wasn't
201 # Over ssh, iterbatch isn't streamy because batch() wasn't
202 # explicitly designed as a streaming method. In the future we
202 # explicitly designed as a streaming method. In the future we
203 # should probably introduce a streambatch() method upstream and
203 # should probably introduce a streambatch() method upstream and
204 # use that for this.
204 # use that for this.
205 with remote.commandexecutor() as e:
205 with remote.commandexecutor() as e:
206 futures = []
206 futures = []
207 for m in missed:
207 for m in missed:
208 futures.append(e.callcommand('x_rfl_getfile', {
208 futures.append(e.callcommand('x_rfl_getfile', {
209 'file': idmap[m],
209 'file': idmap[m],
210 'node': m[-40:]
210 'node': m[-40:]
211 }))
211 }))
212
212
213 for i, m in enumerate(missed):
213 for i, m in enumerate(missed):
214 r = futures[i].result()
214 r = futures[i].result()
215 futures[i] = None # release memory
215 futures[i] = None # release memory
216 file_ = idmap[m]
216 file_ = idmap[m]
217 node = m[-40:]
217 node = m[-40:]
218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
219 progresstick()
219 progresstick()
220
220
221 def _getfiles_optimistic(
221 def _getfiles_optimistic(
222 remote, receivemissing, progresstick, missed, idmap, step):
222 remote, receivemissing, progresstick, missed, idmap, step):
223 remote._callstream("x_rfl_getfiles")
223 remote._callstream("x_rfl_getfiles")
224 i = 0
224 i = 0
225 pipeo = remote._pipeo
225 pipeo = remote._pipeo
226 pipei = remote._pipei
226 pipei = remote._pipei
227 while i < len(missed):
227 while i < len(missed):
228 # issue a batch of requests
228 # issue a batch of requests
229 start = i
229 start = i
230 end = min(len(missed), start + step)
230 end = min(len(missed), start + step)
231 i = end
231 i = end
232 for missingid in missed[start:end]:
232 for missingid in missed[start:end]:
233 # issue new request
233 # issue new request
234 versionid = missingid[-40:]
234 versionid = missingid[-40:]
235 file = idmap[missingid]
235 file = idmap[missingid]
236 sshrequest = "%s%s\n" % (versionid, file)
236 sshrequest = "%s%s\n" % (versionid, file)
237 pipeo.write(sshrequest)
237 pipeo.write(sshrequest)
238 pipeo.flush()
238 pipeo.flush()
239
239
240 # receive batch results
240 # receive batch results
241 for missingid in missed[start:end]:
241 for missingid in missed[start:end]:
242 versionid = missingid[-40:]
242 versionid = missingid[-40:]
243 file = idmap[missingid]
243 file = idmap[missingid]
244 receivemissing(pipei, file, versionid)
244 receivemissing(pipei, file, versionid)
245 progresstick()
245 progresstick()
246
246
247 # End the command
247 # End the command
248 pipeo.write('\n')
248 pipeo.write('\n')
249 pipeo.flush()
249 pipeo.flush()
250
250
251 def _getfiles_threaded(
251 def _getfiles_threaded(
252 remote, receivemissing, progresstick, missed, idmap, step):
252 remote, receivemissing, progresstick, missed, idmap, step):
253 remote._callstream("getfiles")
253 remote._callstream("getfiles")
254 pipeo = remote._pipeo
254 pipeo = remote._pipeo
255 pipei = remote._pipei
255 pipei = remote._pipei
256
256
257 def writer():
257 def writer():
258 for missingid in missed:
258 for missingid in missed:
259 versionid = missingid[-40:]
259 versionid = missingid[-40:]
260 file = idmap[missingid]
260 file = idmap[missingid]
261 sshrequest = "%s%s\n" % (versionid, file)
261 sshrequest = "%s%s\n" % (versionid, file)
262 pipeo.write(sshrequest)
262 pipeo.write(sshrequest)
263 pipeo.flush()
263 pipeo.flush()
264 writerthread = threading.Thread(target=writer)
264 writerthread = threading.Thread(target=writer)
265 writerthread.daemon = True
265 writerthread.daemon = True
266 writerthread.start()
266 writerthread.start()
267
267
268 for missingid in missed:
268 for missingid in missed:
269 versionid = missingid[-40:]
269 versionid = missingid[-40:]
270 file = idmap[missingid]
270 file = idmap[missingid]
271 receivemissing(pipei, file, versionid)
271 receivemissing(pipei, file, versionid)
272 progresstick()
272 progresstick()
273
273
274 writerthread.join()
274 writerthread.join()
275 # End the command
275 # End the command
276 pipeo.write('\n')
276 pipeo.write('\n')
277 pipeo.flush()
277 pipeo.flush()
278
278
279 class fileserverclient(object):
279 class fileserverclient(object):
280 """A client for requesting files from the remote file server.
280 """A client for requesting files from the remote file server.
281 """
281 """
282 def __init__(self, repo):
282 def __init__(self, repo):
283 ui = repo.ui
283 ui = repo.ui
284 self.repo = repo
284 self.repo = repo
285 self.ui = ui
285 self.ui = ui
286 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
286 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
287 if self.cacheprocess:
287 if self.cacheprocess:
288 self.cacheprocess = util.expandpath(self.cacheprocess)
288 self.cacheprocess = util.expandpath(self.cacheprocess)
289
289
290 # This option causes remotefilelog to pass the full file path to the
290 # This option causes remotefilelog to pass the full file path to the
291 # cacheprocess instead of a hashed key.
291 # cacheprocess instead of a hashed key.
292 self.cacheprocesspasspath = ui.configbool(
292 self.cacheprocesspasspath = ui.configbool(
293 "remotefilelog", "cacheprocess.includepath")
293 "remotefilelog", "cacheprocess.includepath")
294
294
295 self.debugoutput = ui.configbool("remotefilelog", "debug")
295 self.debugoutput = ui.configbool("remotefilelog", "debug")
296
296
297 self.remotecache = cacheconnection()
297 self.remotecache = cacheconnection()
298
298
299 def setstore(self, datastore, historystore, writedata, writehistory):
299 def setstore(self, datastore, historystore, writedata, writehistory):
300 self.datastore = datastore
300 self.datastore = datastore
301 self.historystore = historystore
301 self.historystore = historystore
302 self.writedata = writedata
302 self.writedata = writedata
303 self.writehistory = writehistory
303 self.writehistory = writehistory
304
304
305 def _connect(self):
305 def _connect(self):
306 return self.repo.connectionpool.get(self.repo.fallbackpath)
306 return self.repo.connectionpool.get(self.repo.fallbackpath)
307
307
308 def request(self, fileids):
308 def request(self, fileids):
309 """Takes a list of filename/node pairs and fetches them from the
309 """Takes a list of filename/node pairs and fetches them from the
310 server. Files are stored in the local cache.
310 server. Files are stored in the local cache.
311 A list of nodes that the server couldn't find is returned.
311 A list of nodes that the server couldn't find is returned.
312 If the connection fails, an exception is raised.
312 If the connection fails, an exception is raised.
313 """
313 """
314 if not self.remotecache.connected:
314 if not self.remotecache.connected:
315 self.connect()
315 self.connect()
316 cache = self.remotecache
316 cache = self.remotecache
317 writedata = self.writedata
317 writedata = self.writedata
318
318
319 repo = self.repo
319 repo = self.repo
320 total = len(fileids)
320 total = len(fileids)
321 request = "get\n%d\n" % total
321 request = "get\n%d\n" % total
322 idmap = {}
322 idmap = {}
323 reponame = repo.name
323 reponame = repo.name
324 for file, id in fileids:
324 for file, id in fileids:
325 fullid = getcachekey(reponame, file, id)
325 fullid = getcachekey(reponame, file, id)
326 if self.cacheprocesspasspath:
326 if self.cacheprocesspasspath:
327 request += file + '\0'
327 request += file + '\0'
328 request += fullid + "\n"
328 request += fullid + "\n"
329 idmap[fullid] = file
329 idmap[fullid] = file
330
330
331 cache.request(request)
331 cache.request(request)
332
332
333 progress = self.ui.makeprogress(_('downloading'), total=total)
333 progress = self.ui.makeprogress(_('downloading'), total=total)
334 progress.update(0)
334 progress.update(0)
335
335
336 missed = []
336 missed = []
337 while True:
337 while True:
338 missingid = cache.receiveline()
338 missingid = cache.receiveline()
339 if not missingid:
339 if not missingid:
340 missedset = set(missed)
340 missedset = set(missed)
341 for missingid in idmap:
341 for missingid in idmap:
342 if not missingid in missedset:
342 if not missingid in missedset:
343 missed.append(missingid)
343 missed.append(missingid)
344 self.ui.warn(_("warning: cache connection closed early - " +
344 self.ui.warn(_("warning: cache connection closed early - " +
345 "falling back to server\n"))
345 "falling back to server\n"))
346 break
346 break
347 if missingid == "0":
347 if missingid == "0":
348 break
348 break
349 if missingid.startswith("_hits_"):
349 if missingid.startswith("_hits_"):
350 # receive progress reports
350 # receive progress reports
351 parts = missingid.split("_")
351 parts = missingid.split("_")
352 progress.increment(int(parts[2]))
352 progress.increment(int(parts[2]))
353 continue
353 continue
354
354
355 missed.append(missingid)
355 missed.append(missingid)
356
356
357 global fetchmisses
357 global fetchmisses
358 fetchmisses += len(missed)
358 fetchmisses += len(missed)
359
359
360 fromcache = total - len(missed)
360 fromcache = total - len(missed)
361 progress.update(fromcache, total=total)
361 progress.update(fromcache, total=total)
362 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
362 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
363 fromcache, total, hit=fromcache, total=total)
363 fromcache, total, hit=fromcache, total=total)
364
364
365 oldumask = os.umask(0o002)
365 oldumask = os.umask(0o002)
366 try:
366 try:
367 # receive cache misses from master
367 # receive cache misses from master
368 if missed:
368 if missed:
369 # When verbose is true, sshpeer prints 'running ssh...'
369 # When verbose is true, sshpeer prints 'running ssh...'
370 # to stdout, which can interfere with some command
370 # to stdout, which can interfere with some command
371 # outputs
371 # outputs
372 verbose = self.ui.verbose
372 verbose = self.ui.verbose
373 self.ui.verbose = False
373 self.ui.verbose = False
374 try:
374 try:
375 with self._connect() as conn:
375 with self._connect() as conn:
376 remote = conn.peer
376 remote = conn.peer
377 if remote.capable(
377 if remote.capable(
378 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
378 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
379 if not isinstance(remote, _sshv1peer):
379 if not isinstance(remote, _sshv1peer):
380 raise error.Abort('remotefilelog requires ssh '
380 raise error.Abort('remotefilelog requires ssh '
381 'servers')
381 'servers')
382 step = self.ui.configint('remotefilelog',
382 step = self.ui.configint('remotefilelog',
383 'getfilesstep')
383 'getfilesstep')
384 getfilestype = self.ui.config('remotefilelog',
384 getfilestype = self.ui.config('remotefilelog',
385 'getfilestype')
385 'getfilestype')
386 if getfilestype == 'threaded':
386 if getfilestype == 'threaded':
387 _getfiles = _getfiles_threaded
387 _getfiles = _getfiles_threaded
388 else:
388 else:
389 _getfiles = _getfiles_optimistic
389 _getfiles = _getfiles_optimistic
390 _getfiles(remote, self.receivemissing,
390 _getfiles(remote, self.receivemissing,
391 progress.increment, missed, idmap, step)
391 progress.increment, missed, idmap, step)
392 elif remote.capable("x_rfl_getfile"):
392 elif remote.capable("x_rfl_getfile"):
393 if remote.capable('batch'):
393 if remote.capable('batch'):
394 batchdefault = 100
394 batchdefault = 100
395 else:
395 else:
396 batchdefault = 10
396 batchdefault = 10
397 batchsize = self.ui.configint(
397 batchsize = self.ui.configint(
398 'remotefilelog', 'batchsize', batchdefault)
398 'remotefilelog', 'batchsize', batchdefault)
399 self.ui.debug(
400 b'requesting %d files from '
401 b'remotefilelog server...\n' % len(missed))
399 _getfilesbatch(
402 _getfilesbatch(
400 remote, self.receivemissing, progress.increment,
403 remote, self.receivemissing, progress.increment,
401 missed, idmap, batchsize)
404 missed, idmap, batchsize)
402 else:
405 else:
403 raise error.Abort("configured remotefilelog server"
406 raise error.Abort("configured remotefilelog server"
404 " does not support remotefilelog")
407 " does not support remotefilelog")
405
408
406 self.ui.log("remotefilefetchlog",
409 self.ui.log("remotefilefetchlog",
407 "Success\n",
410 "Success\n",
408 fetched_files = progress.pos - fromcache,
411 fetched_files = progress.pos - fromcache,
409 total_to_fetch = total - fromcache)
412 total_to_fetch = total - fromcache)
410 except Exception:
413 except Exception:
411 self.ui.log("remotefilefetchlog",
414 self.ui.log("remotefilefetchlog",
412 "Fail\n",
415 "Fail\n",
413 fetched_files = progress.pos - fromcache,
416 fetched_files = progress.pos - fromcache,
414 total_to_fetch = total - fromcache)
417 total_to_fetch = total - fromcache)
415 raise
418 raise
416 finally:
419 finally:
417 self.ui.verbose = verbose
420 self.ui.verbose = verbose
418 # send to memcache
421 # send to memcache
419 request = "set\n%d\n%s\n" % (len(missed), "\n".join(missed))
422 request = "set\n%d\n%s\n" % (len(missed), "\n".join(missed))
420 cache.request(request)
423 cache.request(request)
421
424
422 progress.complete()
425 progress.complete()
423
426
424 # mark ourselves as a user of this cache
427 # mark ourselves as a user of this cache
425 writedata.markrepo(self.repo.path)
428 writedata.markrepo(self.repo.path)
426 finally:
429 finally:
427 os.umask(oldumask)
430 os.umask(oldumask)
428
431
429 def receivemissing(self, pipe, filename, node):
432 def receivemissing(self, pipe, filename, node):
430 line = pipe.readline()[:-1]
433 line = pipe.readline()[:-1]
431 if not line:
434 if not line:
432 raise error.ResponseError(_("error downloading file contents:"),
435 raise error.ResponseError(_("error downloading file contents:"),
433 _("connection closed early"))
436 _("connection closed early"))
434 size = int(line)
437 size = int(line)
435 data = pipe.read(size)
438 data = pipe.read(size)
436 if len(data) != size:
439 if len(data) != size:
437 raise error.ResponseError(_("error downloading file contents:"),
440 raise error.ResponseError(_("error downloading file contents:"),
438 _("only received %s of %s bytes")
441 _("only received %s of %s bytes")
439 % (len(data), size))
442 % (len(data), size))
440
443
441 self.writedata.addremotefilelognode(filename, bin(node),
444 self.writedata.addremotefilelognode(filename, bin(node),
442 zlib.decompress(data))
445 zlib.decompress(data))
443
446
444 def connect(self):
447 def connect(self):
445 if self.cacheprocess:
448 if self.cacheprocess:
446 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
449 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
447 self.remotecache.connect(cmd)
450 self.remotecache.connect(cmd)
448 else:
451 else:
449 # If no cache process is specified, we fake one that always
452 # If no cache process is specified, we fake one that always
450 # returns cache misses. This enables tests to run easily
453 # returns cache misses. This enables tests to run easily
451 # and may eventually allow us to be a drop in replacement
454 # and may eventually allow us to be a drop in replacement
452 # for the largefiles extension.
455 # for the largefiles extension.
453 class simplecache(object):
456 class simplecache(object):
454 def __init__(self):
457 def __init__(self):
455 self.missingids = []
458 self.missingids = []
456 self.connected = True
459 self.connected = True
457
460
458 def close(self):
461 def close(self):
459 pass
462 pass
460
463
461 def request(self, value, flush=True):
464 def request(self, value, flush=True):
462 lines = value.split("\n")
465 lines = value.split("\n")
463 if lines[0] != "get":
466 if lines[0] != "get":
464 return
467 return
465 self.missingids = lines[2:-1]
468 self.missingids = lines[2:-1]
466 self.missingids.append('0')
469 self.missingids.append('0')
467
470
468 def receiveline(self):
471 def receiveline(self):
469 if len(self.missingids) > 0:
472 if len(self.missingids) > 0:
470 return self.missingids.pop(0)
473 return self.missingids.pop(0)
471 return None
474 return None
472
475
473 self.remotecache = simplecache()
476 self.remotecache = simplecache()
474
477
475 def close(self):
478 def close(self):
476 if fetches:
479 if fetches:
477 msg = ("%d files fetched over %d fetches - " +
480 msg = ("%d files fetched over %d fetches - " +
478 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
481 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
479 fetched,
482 fetched,
480 fetches,
483 fetches,
481 fetchmisses,
484 fetchmisses,
482 float(fetched - fetchmisses) / float(fetched) * 100.0,
485 float(fetched - fetchmisses) / float(fetched) * 100.0,
483 fetchcost)
486 fetchcost)
484 if self.debugoutput:
487 if self.debugoutput:
485 self.ui.warn(msg)
488 self.ui.warn(msg)
486 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
489 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
487 remotefilelogfetched=fetched,
490 remotefilelogfetched=fetched,
488 remotefilelogfetches=fetches,
491 remotefilelogfetches=fetches,
489 remotefilelogfetchmisses=fetchmisses,
492 remotefilelogfetchmisses=fetchmisses,
490 remotefilelogfetchtime=fetchcost * 1000)
493 remotefilelogfetchtime=fetchcost * 1000)
491
494
492 if self.remotecache.connected:
495 if self.remotecache.connected:
493 self.remotecache.close()
496 self.remotecache.close()
494
497
495 def prefetch(self, fileids, force=False, fetchdata=True,
498 def prefetch(self, fileids, force=False, fetchdata=True,
496 fetchhistory=False):
499 fetchhistory=False):
497 """downloads the given file versions to the cache
500 """downloads the given file versions to the cache
498 """
501 """
499 repo = self.repo
502 repo = self.repo
500 idstocheck = []
503 idstocheck = []
501 for file, id in fileids:
504 for file, id in fileids:
502 # hack
505 # hack
503 # - we don't use .hgtags
506 # - we don't use .hgtags
504 # - workingctx produces ids with length 42,
507 # - workingctx produces ids with length 42,
505 # which we skip since they aren't in any cache
508 # which we skip since they aren't in any cache
506 if (file == '.hgtags' or len(id) == 42
509 if (file == '.hgtags' or len(id) == 42
507 or not repo.shallowmatch(file)):
510 or not repo.shallowmatch(file)):
508 continue
511 continue
509
512
510 idstocheck.append((file, bin(id)))
513 idstocheck.append((file, bin(id)))
511
514
512 datastore = self.datastore
515 datastore = self.datastore
513 historystore = self.historystore
516 historystore = self.historystore
514 if force:
517 if force:
515 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
518 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
516 historystore = metadatastore.unionmetadatastore(
519 historystore = metadatastore.unionmetadatastore(
517 *repo.sharedhistorystores)
520 *repo.sharedhistorystores)
518
521
519 missingids = set()
522 missingids = set()
520 if fetchdata:
523 if fetchdata:
521 missingids.update(datastore.getmissing(idstocheck))
524 missingids.update(datastore.getmissing(idstocheck))
522 if fetchhistory:
525 if fetchhistory:
523 missingids.update(historystore.getmissing(idstocheck))
526 missingids.update(historystore.getmissing(idstocheck))
524
527
525 # partition missing nodes into nullid and not-nullid so we can
528 # partition missing nodes into nullid and not-nullid so we can
526 # warn about this filtering potentially shadowing bugs.
529 # warn about this filtering potentially shadowing bugs.
527 nullids = len([None for unused, id in missingids if id == nullid])
530 nullids = len([None for unused, id in missingids if id == nullid])
528 if nullids:
531 if nullids:
529 missingids = [(f, id) for f, id in missingids if id != nullid]
532 missingids = [(f, id) for f, id in missingids if id != nullid]
530 repo.ui.develwarn(
533 repo.ui.develwarn(
531 ('remotefilelog not fetching %d null revs'
534 ('remotefilelog not fetching %d null revs'
532 ' - this is likely hiding bugs' % nullids),
535 ' - this is likely hiding bugs' % nullids),
533 config='remotefilelog-ext')
536 config='remotefilelog-ext')
534 if missingids:
537 if missingids:
535 global fetches, fetched, fetchcost
538 global fetches, fetched, fetchcost
536 fetches += 1
539 fetches += 1
537
540
538 # We want to be able to detect excess individual file downloads, so
541 # We want to be able to detect excess individual file downloads, so
539 # let's log that information for debugging.
542 # let's log that information for debugging.
540 if fetches >= 15 and fetches < 18:
543 if fetches >= 15 and fetches < 18:
541 if fetches == 15:
544 if fetches == 15:
542 fetchwarning = self.ui.config('remotefilelog',
545 fetchwarning = self.ui.config('remotefilelog',
543 'fetchwarning')
546 'fetchwarning')
544 if fetchwarning:
547 if fetchwarning:
545 self.ui.warn(fetchwarning + '\n')
548 self.ui.warn(fetchwarning + '\n')
546 self.logstacktrace()
549 self.logstacktrace()
547 missingids = [(file, hex(id)) for file, id in sorted(missingids)]
550 missingids = [(file, hex(id)) for file, id in sorted(missingids)]
548 fetched += len(missingids)
551 fetched += len(missingids)
549 start = time.time()
552 start = time.time()
550 missingids = self.request(missingids)
553 missingids = self.request(missingids)
551 if missingids:
554 if missingids:
552 raise error.Abort(_("unable to download %d files") %
555 raise error.Abort(_("unable to download %d files") %
553 len(missingids))
556 len(missingids))
554 fetchcost += time.time() - start
557 fetchcost += time.time() - start
555 self._lfsprefetch(fileids)
558 self._lfsprefetch(fileids)
556
559
557 def _lfsprefetch(self, fileids):
560 def _lfsprefetch(self, fileids):
558 if not _lfsmod or not util.safehasattr(
561 if not _lfsmod or not util.safehasattr(
559 self.repo.svfs, 'lfslocalblobstore'):
562 self.repo.svfs, 'lfslocalblobstore'):
560 return
563 return
561 if not _lfsmod.wrapper.candownload(self.repo):
564 if not _lfsmod.wrapper.candownload(self.repo):
562 return
565 return
563 pointers = []
566 pointers = []
564 store = self.repo.svfs.lfslocalblobstore
567 store = self.repo.svfs.lfslocalblobstore
565 for file, id in fileids:
568 for file, id in fileids:
566 node = bin(id)
569 node = bin(id)
567 rlog = self.repo.file(file)
570 rlog = self.repo.file(file)
568 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
571 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
569 text = rlog.revision(node, raw=True)
572 text = rlog.revision(node, raw=True)
570 p = _lfsmod.pointer.deserialize(text)
573 p = _lfsmod.pointer.deserialize(text)
571 oid = p.oid()
574 oid = p.oid()
572 if not store.has(oid):
575 if not store.has(oid):
573 pointers.append(p)
576 pointers.append(p)
574 if len(pointers) > 0:
577 if len(pointers) > 0:
575 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
578 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
576 assert all(store.has(p.oid()) for p in pointers)
579 assert all(store.has(p.oid()) for p in pointers)
577
580
578 def logstacktrace(self):
581 def logstacktrace(self):
579 import traceback
582 import traceback
580 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
583 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
581 ''.join(traceback.format_stack()))
584 ''.join(traceback.format_stack()))
General Comments 0
You need to be logged in to leave comments. Login now