##// END OF EJS Templates
remotefilelog: reduce use of "count" container...
Martin von Zweigbergk -
r40885:fcee112f default
parent child Browse files
Show More
@@ -1,587 +1,587 b''
1 # fileserverclient.py - client for communicating with the cache process
1 # fileserverclient.py - client for communicating with the cache process
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import io
11 import io
12 import os
12 import os
13 import threading
13 import threading
14 import time
14 import time
15 import zlib
15 import zlib
16
16
17 from mercurial.i18n import _
17 from mercurial.i18n import _
18 from mercurial.node import bin, hex, nullid
18 from mercurial.node import bin, hex, nullid
19 from mercurial import (
19 from mercurial import (
20 error,
20 error,
21 node,
21 node,
22 pycompat,
22 pycompat,
23 revlog,
23 revlog,
24 sshpeer,
24 sshpeer,
25 util,
25 util,
26 wireprotov1peer,
26 wireprotov1peer,
27 )
27 )
28 from mercurial.utils import procutil
28 from mercurial.utils import procutil
29
29
30 from . import (
30 from . import (
31 constants,
31 constants,
32 contentstore,
32 contentstore,
33 metadatastore,
33 metadatastore,
34 )
34 )
35
35
36 _sshv1peer = sshpeer.sshv1peer
36 _sshv1peer = sshpeer.sshv1peer
37
37
38 # Statistics for debugging
38 # Statistics for debugging
39 fetchcost = 0
39 fetchcost = 0
40 fetches = 0
40 fetches = 0
41 fetched = 0
41 fetched = 0
42 fetchmisses = 0
42 fetchmisses = 0
43
43
44 _lfsmod = None
44 _lfsmod = None
45
45
46 def getcachekey(reponame, file, id):
46 def getcachekey(reponame, file, id):
47 pathhash = node.hex(hashlib.sha1(file).digest())
47 pathhash = node.hex(hashlib.sha1(file).digest())
48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
49
49
50 def getlocalkey(file, id):
50 def getlocalkey(file, id):
51 pathhash = node.hex(hashlib.sha1(file).digest())
51 pathhash = node.hex(hashlib.sha1(file).digest())
52 return os.path.join(pathhash, id)
52 return os.path.join(pathhash, id)
53
53
54 def peersetup(ui, peer):
54 def peersetup(ui, peer):
55
55
56 class remotefilepeer(peer.__class__):
56 class remotefilepeer(peer.__class__):
57 @wireprotov1peer.batchable
57 @wireprotov1peer.batchable
58 def x_rfl_getfile(self, file, node):
58 def x_rfl_getfile(self, file, node):
59 if not self.capable('x_rfl_getfile'):
59 if not self.capable('x_rfl_getfile'):
60 raise error.Abort(
60 raise error.Abort(
61 'configured remotefile server does not support getfile')
61 'configured remotefile server does not support getfile')
62 f = wireprotov1peer.future()
62 f = wireprotov1peer.future()
63 yield {'file': file, 'node': node}, f
63 yield {'file': file, 'node': node}, f
64 code, data = f.value.split('\0', 1)
64 code, data = f.value.split('\0', 1)
65 if int(code):
65 if int(code):
66 raise error.LookupError(file, node, data)
66 raise error.LookupError(file, node, data)
67 yield data
67 yield data
68
68
69 @wireprotov1peer.batchable
69 @wireprotov1peer.batchable
70 def x_rfl_getflogheads(self, path):
70 def x_rfl_getflogheads(self, path):
71 if not self.capable('x_rfl_getflogheads'):
71 if not self.capable('x_rfl_getflogheads'):
72 raise error.Abort('configured remotefile server does not '
72 raise error.Abort('configured remotefile server does not '
73 'support getflogheads')
73 'support getflogheads')
74 f = wireprotov1peer.future()
74 f = wireprotov1peer.future()
75 yield {'path': path}, f
75 yield {'path': path}, f
76 heads = f.value.split('\n') if f.value else []
76 heads = f.value.split('\n') if f.value else []
77 yield heads
77 yield heads
78
78
79 def _updatecallstreamopts(self, command, opts):
79 def _updatecallstreamopts(self, command, opts):
80 if command != 'getbundle':
80 if command != 'getbundle':
81 return
81 return
82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
83 not in self.capabilities()):
83 not in self.capabilities()):
84 return
84 return
85 if not util.safehasattr(self, '_localrepo'):
85 if not util.safehasattr(self, '_localrepo'):
86 return
86 return
87 if (constants.SHALLOWREPO_REQUIREMENT
87 if (constants.SHALLOWREPO_REQUIREMENT
88 not in self._localrepo.requirements):
88 not in self._localrepo.requirements):
89 return
89 return
90
90
91 bundlecaps = opts.get('bundlecaps')
91 bundlecaps = opts.get('bundlecaps')
92 if bundlecaps:
92 if bundlecaps:
93 bundlecaps = [bundlecaps]
93 bundlecaps = [bundlecaps]
94 else:
94 else:
95 bundlecaps = []
95 bundlecaps = []
96
96
97 # shallow, includepattern, and excludepattern are a hacky way of
97 # shallow, includepattern, and excludepattern are a hacky way of
98 # carrying over data from the local repo to this getbundle
98 # carrying over data from the local repo to this getbundle
99 # command. We need to do it this way because bundle1 getbundle
99 # command. We need to do it this way because bundle1 getbundle
100 # doesn't provide any other place we can hook in to manipulate
100 # doesn't provide any other place we can hook in to manipulate
101 # getbundle args before it goes across the wire. Once we get rid
101 # getbundle args before it goes across the wire. Once we get rid
102 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
102 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
103 # do this more cleanly.
103 # do this more cleanly.
104 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
104 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
105 if self._localrepo.includepattern:
105 if self._localrepo.includepattern:
106 patterns = '\0'.join(self._localrepo.includepattern)
106 patterns = '\0'.join(self._localrepo.includepattern)
107 includecap = "includepattern=" + patterns
107 includecap = "includepattern=" + patterns
108 bundlecaps.append(includecap)
108 bundlecaps.append(includecap)
109 if self._localrepo.excludepattern:
109 if self._localrepo.excludepattern:
110 patterns = '\0'.join(self._localrepo.excludepattern)
110 patterns = '\0'.join(self._localrepo.excludepattern)
111 excludecap = "excludepattern=" + patterns
111 excludecap = "excludepattern=" + patterns
112 bundlecaps.append(excludecap)
112 bundlecaps.append(excludecap)
113 opts['bundlecaps'] = ','.join(bundlecaps)
113 opts['bundlecaps'] = ','.join(bundlecaps)
114
114
115 def _sendrequest(self, command, args, **opts):
115 def _sendrequest(self, command, args, **opts):
116 self._updatecallstreamopts(command, args)
116 self._updatecallstreamopts(command, args)
117 return super(remotefilepeer, self)._sendrequest(command, args,
117 return super(remotefilepeer, self)._sendrequest(command, args,
118 **opts)
118 **opts)
119
119
120 def _callstream(self, command, **opts):
120 def _callstream(self, command, **opts):
121 supertype = super(remotefilepeer, self)
121 supertype = super(remotefilepeer, self)
122 if not util.safehasattr(supertype, '_sendrequest'):
122 if not util.safehasattr(supertype, '_sendrequest'):
123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
124 return super(remotefilepeer, self)._callstream(command, **opts)
124 return super(remotefilepeer, self)._callstream(command, **opts)
125
125
126 peer.__class__ = remotefilepeer
126 peer.__class__ = remotefilepeer
127
127
128 class cacheconnection(object):
128 class cacheconnection(object):
129 """The connection for communicating with the remote cache. Performs
129 """The connection for communicating with the remote cache. Performs
130 gets and sets by communicating with an external process that has the
130 gets and sets by communicating with an external process that has the
131 cache-specific implementation.
131 cache-specific implementation.
132 """
132 """
133 def __init__(self):
133 def __init__(self):
134 self.pipeo = self.pipei = self.pipee = None
134 self.pipeo = self.pipei = self.pipee = None
135 self.subprocess = None
135 self.subprocess = None
136 self.connected = False
136 self.connected = False
137
137
138 def connect(self, cachecommand):
138 def connect(self, cachecommand):
139 if self.pipeo:
139 if self.pipeo:
140 raise error.Abort(_("cache connection already open"))
140 raise error.Abort(_("cache connection already open"))
141 self.pipei, self.pipeo, self.pipee, self.subprocess = \
141 self.pipei, self.pipeo, self.pipee, self.subprocess = \
142 procutil.popen4(cachecommand)
142 procutil.popen4(cachecommand)
143 self.connected = True
143 self.connected = True
144
144
145 def close(self):
145 def close(self):
146 def tryclose(pipe):
146 def tryclose(pipe):
147 try:
147 try:
148 pipe.close()
148 pipe.close()
149 except Exception:
149 except Exception:
150 pass
150 pass
151 if self.connected:
151 if self.connected:
152 try:
152 try:
153 self.pipei.write("exit\n")
153 self.pipei.write("exit\n")
154 except Exception:
154 except Exception:
155 pass
155 pass
156 tryclose(self.pipei)
156 tryclose(self.pipei)
157 self.pipei = None
157 self.pipei = None
158 tryclose(self.pipeo)
158 tryclose(self.pipeo)
159 self.pipeo = None
159 self.pipeo = None
160 tryclose(self.pipee)
160 tryclose(self.pipee)
161 self.pipee = None
161 self.pipee = None
162 try:
162 try:
163 # Wait for process to terminate, making sure to avoid deadlock.
163 # Wait for process to terminate, making sure to avoid deadlock.
164 # See https://docs.python.org/2/library/subprocess.html for
164 # See https://docs.python.org/2/library/subprocess.html for
165 # warnings about wait() and deadlocking.
165 # warnings about wait() and deadlocking.
166 self.subprocess.communicate()
166 self.subprocess.communicate()
167 except Exception:
167 except Exception:
168 pass
168 pass
169 self.subprocess = None
169 self.subprocess = None
170 self.connected = False
170 self.connected = False
171
171
172 def request(self, request, flush=True):
172 def request(self, request, flush=True):
173 if self.connected:
173 if self.connected:
174 try:
174 try:
175 self.pipei.write(request)
175 self.pipei.write(request)
176 if flush:
176 if flush:
177 self.pipei.flush()
177 self.pipei.flush()
178 except IOError:
178 except IOError:
179 self.close()
179 self.close()
180
180
181 def receiveline(self):
181 def receiveline(self):
182 if not self.connected:
182 if not self.connected:
183 return None
183 return None
184 try:
184 try:
185 result = self.pipeo.readline()[:-1]
185 result = self.pipeo.readline()[:-1]
186 if not result:
186 if not result:
187 self.close()
187 self.close()
188 except IOError:
188 except IOError:
189 self.close()
189 self.close()
190
190
191 return result
191 return result
192
192
193 def _getfilesbatch(
193 def _getfilesbatch(
194 remote, receivemissing, progresstick, missed, idmap, batchsize):
194 remote, receivemissing, progresstick, missed, idmap, batchsize):
195 # Over http(s), iterbatch is a streamy method and we can start
195 # Over http(s), iterbatch is a streamy method and we can start
196 # looking at results early. This means we send one (potentially
196 # looking at results early. This means we send one (potentially
197 # large) request, but then we show nice progress as we process
197 # large) request, but then we show nice progress as we process
198 # file results, rather than showing chunks of $batchsize in
198 # file results, rather than showing chunks of $batchsize in
199 # progress.
199 # progress.
200 #
200 #
201 # Over ssh, iterbatch isn't streamy because batch() wasn't
201 # Over ssh, iterbatch isn't streamy because batch() wasn't
202 # explicitly designed as a streaming method. In the future we
202 # explicitly designed as a streaming method. In the future we
203 # should probably introduce a streambatch() method upstream and
203 # should probably introduce a streambatch() method upstream and
204 # use that for this.
204 # use that for this.
205 with remote.commandexecutor() as e:
205 with remote.commandexecutor() as e:
206 futures = []
206 futures = []
207 for m in missed:
207 for m in missed:
208 futures.append(e.callcommand('x_rfl_getfile', {
208 futures.append(e.callcommand('x_rfl_getfile', {
209 'file': idmap[m],
209 'file': idmap[m],
210 'node': m[-40:]
210 'node': m[-40:]
211 }))
211 }))
212
212
213 for i, m in enumerate(missed):
213 for i, m in enumerate(missed):
214 r = futures[i].result()
214 r = futures[i].result()
215 futures[i] = None # release memory
215 futures[i] = None # release memory
216 file_ = idmap[m]
216 file_ = idmap[m]
217 node = m[-40:]
217 node = m[-40:]
218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
219 progresstick()
219 progresstick()
220
220
221 def _getfiles_optimistic(
221 def _getfiles_optimistic(
222 remote, receivemissing, progresstick, missed, idmap, step):
222 remote, receivemissing, progresstick, missed, idmap, step):
223 remote._callstream("x_rfl_getfiles")
223 remote._callstream("x_rfl_getfiles")
224 i = 0
224 i = 0
225 pipeo = remote._pipeo
225 pipeo = remote._pipeo
226 pipei = remote._pipei
226 pipei = remote._pipei
227 while i < len(missed):
227 while i < len(missed):
228 # issue a batch of requests
228 # issue a batch of requests
229 start = i
229 start = i
230 end = min(len(missed), start + step)
230 end = min(len(missed), start + step)
231 i = end
231 i = end
232 for missingid in missed[start:end]:
232 for missingid in missed[start:end]:
233 # issue new request
233 # issue new request
234 versionid = missingid[-40:]
234 versionid = missingid[-40:]
235 file = idmap[missingid]
235 file = idmap[missingid]
236 sshrequest = "%s%s\n" % (versionid, file)
236 sshrequest = "%s%s\n" % (versionid, file)
237 pipeo.write(sshrequest)
237 pipeo.write(sshrequest)
238 pipeo.flush()
238 pipeo.flush()
239
239
240 # receive batch results
240 # receive batch results
241 for missingid in missed[start:end]:
241 for missingid in missed[start:end]:
242 versionid = missingid[-40:]
242 versionid = missingid[-40:]
243 file = idmap[missingid]
243 file = idmap[missingid]
244 receivemissing(pipei, file, versionid)
244 receivemissing(pipei, file, versionid)
245 progresstick()
245 progresstick()
246
246
247 # End the command
247 # End the command
248 pipeo.write('\n')
248 pipeo.write('\n')
249 pipeo.flush()
249 pipeo.flush()
250
250
251 def _getfiles_threaded(
251 def _getfiles_threaded(
252 remote, receivemissing, progresstick, missed, idmap, step):
252 remote, receivemissing, progresstick, missed, idmap, step):
253 remote._callstream("getfiles")
253 remote._callstream("getfiles")
254 pipeo = remote._pipeo
254 pipeo = remote._pipeo
255 pipei = remote._pipei
255 pipei = remote._pipei
256
256
257 def writer():
257 def writer():
258 for missingid in missed:
258 for missingid in missed:
259 versionid = missingid[-40:]
259 versionid = missingid[-40:]
260 file = idmap[missingid]
260 file = idmap[missingid]
261 sshrequest = "%s%s\n" % (versionid, file)
261 sshrequest = "%s%s\n" % (versionid, file)
262 pipeo.write(sshrequest)
262 pipeo.write(sshrequest)
263 pipeo.flush()
263 pipeo.flush()
264 writerthread = threading.Thread(target=writer)
264 writerthread = threading.Thread(target=writer)
265 writerthread.daemon = True
265 writerthread.daemon = True
266 writerthread.start()
266 writerthread.start()
267
267
268 for missingid in missed:
268 for missingid in missed:
269 versionid = missingid[-40:]
269 versionid = missingid[-40:]
270 file = idmap[missingid]
270 file = idmap[missingid]
271 receivemissing(pipei, file, versionid)
271 receivemissing(pipei, file, versionid)
272 progresstick()
272 progresstick()
273
273
274 writerthread.join()
274 writerthread.join()
275 # End the command
275 # End the command
276 pipeo.write('\n')
276 pipeo.write('\n')
277 pipeo.flush()
277 pipeo.flush()
278
278
279 class fileserverclient(object):
279 class fileserverclient(object):
280 """A client for requesting files from the remote file server.
280 """A client for requesting files from the remote file server.
281 """
281 """
282 def __init__(self, repo):
282 def __init__(self, repo):
283 ui = repo.ui
283 ui = repo.ui
284 self.repo = repo
284 self.repo = repo
285 self.ui = ui
285 self.ui = ui
286 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
286 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
287 if self.cacheprocess:
287 if self.cacheprocess:
288 self.cacheprocess = util.expandpath(self.cacheprocess)
288 self.cacheprocess = util.expandpath(self.cacheprocess)
289
289
290 # This option causes remotefilelog to pass the full file path to the
290 # This option causes remotefilelog to pass the full file path to the
291 # cacheprocess instead of a hashed key.
291 # cacheprocess instead of a hashed key.
292 self.cacheprocesspasspath = ui.configbool(
292 self.cacheprocesspasspath = ui.configbool(
293 "remotefilelog", "cacheprocess.includepath")
293 "remotefilelog", "cacheprocess.includepath")
294
294
295 self.debugoutput = ui.configbool("remotefilelog", "debug")
295 self.debugoutput = ui.configbool("remotefilelog", "debug")
296
296
297 self.remotecache = cacheconnection()
297 self.remotecache = cacheconnection()
298
298
299 def setstore(self, datastore, historystore, writedata, writehistory):
299 def setstore(self, datastore, historystore, writedata, writehistory):
300 self.datastore = datastore
300 self.datastore = datastore
301 self.historystore = historystore
301 self.historystore = historystore
302 self.writedata = writedata
302 self.writedata = writedata
303 self.writehistory = writehistory
303 self.writehistory = writehistory
304
304
305 def _connect(self):
305 def _connect(self):
306 return self.repo.connectionpool.get(self.repo.fallbackpath)
306 return self.repo.connectionpool.get(self.repo.fallbackpath)
307
307
308 def request(self, fileids):
308 def request(self, fileids):
309 """Takes a list of filename/node pairs and fetches them from the
309 """Takes a list of filename/node pairs and fetches them from the
310 server. Files are stored in the local cache.
310 server. Files are stored in the local cache.
311 A list of nodes that the server couldn't find is returned.
311 A list of nodes that the server couldn't find is returned.
312 If the connection fails, an exception is raised.
312 If the connection fails, an exception is raised.
313 """
313 """
314 if not self.remotecache.connected:
314 if not self.remotecache.connected:
315 self.connect()
315 self.connect()
316 cache = self.remotecache
316 cache = self.remotecache
317 writedata = self.writedata
317 writedata = self.writedata
318
318
319 repo = self.repo
319 repo = self.repo
320 total = len(fileids)
320 total = len(fileids)
321 request = "get\n%d\n" % total
321 request = "get\n%d\n" % total
322 idmap = {}
322 idmap = {}
323 reponame = repo.name
323 reponame = repo.name
324 for file, id in fileids:
324 for file, id in fileids:
325 fullid = getcachekey(reponame, file, id)
325 fullid = getcachekey(reponame, file, id)
326 if self.cacheprocesspasspath:
326 if self.cacheprocesspasspath:
327 request += file + '\0'
327 request += file + '\0'
328 request += fullid + "\n"
328 request += fullid + "\n"
329 idmap[fullid] = file
329 idmap[fullid] = file
330
330
331 cache.request(request)
331 cache.request(request)
332
332
333 progress = self.ui.makeprogress(_('downloading'), total=total)
333 progress = self.ui.makeprogress(_('downloading'), total=total)
334 progress.update(0)
334 progress.update(0)
335
335
336 missed = []
336 missed = []
337 count = 0
337 count = 0
338 while True:
338 while True:
339 missingid = cache.receiveline()
339 missingid = cache.receiveline()
340 if not missingid:
340 if not missingid:
341 missedset = set(missed)
341 missedset = set(missed)
342 for missingid in idmap:
342 for missingid in idmap:
343 if not missingid in missedset:
343 if not missingid in missedset:
344 missed.append(missingid)
344 missed.append(missingid)
345 self.ui.warn(_("warning: cache connection closed early - " +
345 self.ui.warn(_("warning: cache connection closed early - " +
346 "falling back to server\n"))
346 "falling back to server\n"))
347 break
347 break
348 if missingid == "0":
348 if missingid == "0":
349 break
349 break
350 if missingid.startswith("_hits_"):
350 if missingid.startswith("_hits_"):
351 # receive progress reports
351 # receive progress reports
352 parts = missingid.split("_")
352 parts = missingid.split("_")
353 count += int(parts[2])
353 count += int(parts[2])
354 progress.update(count)
354 progress.update(count)
355 continue
355 continue
356
356
357 missed.append(missingid)
357 missed.append(missingid)
358
358
359 global fetchmisses
359 global fetchmisses
360 fetchmisses += len(missed)
360 fetchmisses += len(missed)
361
361
362 fromcache = total - len(missed)
362 fromcache = total - len(missed)
363 count = [fromcache]
363 count = [fromcache]
364 progress.update(count[0], total=total)
364 progress.update(fromcache, total=total)
365 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
365 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
366 count[0], total, hit=count[0], total=total)
366 fromcache, total, hit=fromcache, total=total)
367
367
368 oldumask = os.umask(0o002)
368 oldumask = os.umask(0o002)
369 try:
369 try:
370 # receive cache misses from master
370 # receive cache misses from master
371 if missed:
371 if missed:
372 def progresstick():
372 def progresstick():
373 count[0] += 1
373 count[0] += 1
374 progress.update(count[0])
374 progress.update(count[0])
375 # When verbose is true, sshpeer prints 'running ssh...'
375 # When verbose is true, sshpeer prints 'running ssh...'
376 # to stdout, which can interfere with some command
376 # to stdout, which can interfere with some command
377 # outputs
377 # outputs
378 verbose = self.ui.verbose
378 verbose = self.ui.verbose
379 self.ui.verbose = False
379 self.ui.verbose = False
380 try:
380 try:
381 with self._connect() as conn:
381 with self._connect() as conn:
382 remote = conn.peer
382 remote = conn.peer
383 if remote.capable(
383 if remote.capable(
384 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
384 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
385 if not isinstance(remote, _sshv1peer):
385 if not isinstance(remote, _sshv1peer):
386 raise error.Abort('remotefilelog requires ssh '
386 raise error.Abort('remotefilelog requires ssh '
387 'servers')
387 'servers')
388 step = self.ui.configint('remotefilelog',
388 step = self.ui.configint('remotefilelog',
389 'getfilesstep')
389 'getfilesstep')
390 getfilestype = self.ui.config('remotefilelog',
390 getfilestype = self.ui.config('remotefilelog',
391 'getfilestype')
391 'getfilestype')
392 if getfilestype == 'threaded':
392 if getfilestype == 'threaded':
393 _getfiles = _getfiles_threaded
393 _getfiles = _getfiles_threaded
394 else:
394 else:
395 _getfiles = _getfiles_optimistic
395 _getfiles = _getfiles_optimistic
396 _getfiles(remote, self.receivemissing, progresstick,
396 _getfiles(remote, self.receivemissing, progresstick,
397 missed, idmap, step)
397 missed, idmap, step)
398 elif remote.capable("x_rfl_getfile"):
398 elif remote.capable("x_rfl_getfile"):
399 if remote.capable('batch'):
399 if remote.capable('batch'):
400 batchdefault = 100
400 batchdefault = 100
401 else:
401 else:
402 batchdefault = 10
402 batchdefault = 10
403 batchsize = self.ui.configint(
403 batchsize = self.ui.configint(
404 'remotefilelog', 'batchsize', batchdefault)
404 'remotefilelog', 'batchsize', batchdefault)
405 _getfilesbatch(
405 _getfilesbatch(
406 remote, self.receivemissing, progresstick,
406 remote, self.receivemissing, progresstick,
407 missed, idmap, batchsize)
407 missed, idmap, batchsize)
408 else:
408 else:
409 raise error.Abort("configured remotefilelog server"
409 raise error.Abort("configured remotefilelog server"
410 " does not support remotefilelog")
410 " does not support remotefilelog")
411
411
412 self.ui.log("remotefilefetchlog",
412 self.ui.log("remotefilefetchlog",
413 "Success\n",
413 "Success\n",
414 fetched_files = count[0] - fromcache,
414 fetched_files = count[0] - fromcache,
415 total_to_fetch = total - fromcache)
415 total_to_fetch = total - fromcache)
416 except Exception:
416 except Exception:
417 self.ui.log("remotefilefetchlog",
417 self.ui.log("remotefilefetchlog",
418 "Fail\n",
418 "Fail\n",
419 fetched_files = count[0] - fromcache,
419 fetched_files = count[0] - fromcache,
420 total_to_fetch = total - fromcache)
420 total_to_fetch = total - fromcache)
421 raise
421 raise
422 finally:
422 finally:
423 self.ui.verbose = verbose
423 self.ui.verbose = verbose
424 # send to memcache
424 # send to memcache
425 request = "set\n%d\n%s\n" % (len(missed), "\n".join(missed))
425 request = "set\n%d\n%s\n" % (len(missed), "\n".join(missed))
426 cache.request(request)
426 cache.request(request)
427
427
428 progress.complete()
428 progress.complete()
429
429
430 # mark ourselves as a user of this cache
430 # mark ourselves as a user of this cache
431 writedata.markrepo(self.repo.path)
431 writedata.markrepo(self.repo.path)
432 finally:
432 finally:
433 os.umask(oldumask)
433 os.umask(oldumask)
434
434
435 def receivemissing(self, pipe, filename, node):
435 def receivemissing(self, pipe, filename, node):
436 line = pipe.readline()[:-1]
436 line = pipe.readline()[:-1]
437 if not line:
437 if not line:
438 raise error.ResponseError(_("error downloading file contents:"),
438 raise error.ResponseError(_("error downloading file contents:"),
439 _("connection closed early"))
439 _("connection closed early"))
440 size = int(line)
440 size = int(line)
441 data = pipe.read(size)
441 data = pipe.read(size)
442 if len(data) != size:
442 if len(data) != size:
443 raise error.ResponseError(_("error downloading file contents:"),
443 raise error.ResponseError(_("error downloading file contents:"),
444 _("only received %s of %s bytes")
444 _("only received %s of %s bytes")
445 % (len(data), size))
445 % (len(data), size))
446
446
447 self.writedata.addremotefilelognode(filename, bin(node),
447 self.writedata.addremotefilelognode(filename, bin(node),
448 zlib.decompress(data))
448 zlib.decompress(data))
449
449
450 def connect(self):
450 def connect(self):
451 if self.cacheprocess:
451 if self.cacheprocess:
452 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
452 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
453 self.remotecache.connect(cmd)
453 self.remotecache.connect(cmd)
454 else:
454 else:
455 # If no cache process is specified, we fake one that always
455 # If no cache process is specified, we fake one that always
456 # returns cache misses. This enables tests to run easily
456 # returns cache misses. This enables tests to run easily
457 # and may eventually allow us to be a drop in replacement
457 # and may eventually allow us to be a drop in replacement
458 # for the largefiles extension.
458 # for the largefiles extension.
459 class simplecache(object):
459 class simplecache(object):
460 def __init__(self):
460 def __init__(self):
461 self.missingids = []
461 self.missingids = []
462 self.connected = True
462 self.connected = True
463
463
464 def close(self):
464 def close(self):
465 pass
465 pass
466
466
467 def request(self, value, flush=True):
467 def request(self, value, flush=True):
468 lines = value.split("\n")
468 lines = value.split("\n")
469 if lines[0] != "get":
469 if lines[0] != "get":
470 return
470 return
471 self.missingids = lines[2:-1]
471 self.missingids = lines[2:-1]
472 self.missingids.append('0')
472 self.missingids.append('0')
473
473
474 def receiveline(self):
474 def receiveline(self):
475 if len(self.missingids) > 0:
475 if len(self.missingids) > 0:
476 return self.missingids.pop(0)
476 return self.missingids.pop(0)
477 return None
477 return None
478
478
479 self.remotecache = simplecache()
479 self.remotecache = simplecache()
480
480
481 def close(self):
481 def close(self):
482 if fetches:
482 if fetches:
483 msg = ("%d files fetched over %d fetches - " +
483 msg = ("%d files fetched over %d fetches - " +
484 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
484 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
485 fetched,
485 fetched,
486 fetches,
486 fetches,
487 fetchmisses,
487 fetchmisses,
488 float(fetched - fetchmisses) / float(fetched) * 100.0,
488 float(fetched - fetchmisses) / float(fetched) * 100.0,
489 fetchcost)
489 fetchcost)
490 if self.debugoutput:
490 if self.debugoutput:
491 self.ui.warn(msg)
491 self.ui.warn(msg)
492 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
492 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
493 remotefilelogfetched=fetched,
493 remotefilelogfetched=fetched,
494 remotefilelogfetches=fetches,
494 remotefilelogfetches=fetches,
495 remotefilelogfetchmisses=fetchmisses,
495 remotefilelogfetchmisses=fetchmisses,
496 remotefilelogfetchtime=fetchcost * 1000)
496 remotefilelogfetchtime=fetchcost * 1000)
497
497
498 if self.remotecache.connected:
498 if self.remotecache.connected:
499 self.remotecache.close()
499 self.remotecache.close()
500
500
501 def prefetch(self, fileids, force=False, fetchdata=True,
501 def prefetch(self, fileids, force=False, fetchdata=True,
502 fetchhistory=False):
502 fetchhistory=False):
503 """downloads the given file versions to the cache
503 """downloads the given file versions to the cache
504 """
504 """
505 repo = self.repo
505 repo = self.repo
506 idstocheck = []
506 idstocheck = []
507 for file, id in fileids:
507 for file, id in fileids:
508 # hack
508 # hack
509 # - we don't use .hgtags
509 # - we don't use .hgtags
510 # - workingctx produces ids with length 42,
510 # - workingctx produces ids with length 42,
511 # which we skip since they aren't in any cache
511 # which we skip since they aren't in any cache
512 if (file == '.hgtags' or len(id) == 42
512 if (file == '.hgtags' or len(id) == 42
513 or not repo.shallowmatch(file)):
513 or not repo.shallowmatch(file)):
514 continue
514 continue
515
515
516 idstocheck.append((file, bin(id)))
516 idstocheck.append((file, bin(id)))
517
517
518 datastore = self.datastore
518 datastore = self.datastore
519 historystore = self.historystore
519 historystore = self.historystore
520 if force:
520 if force:
521 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
521 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
522 historystore = metadatastore.unionmetadatastore(
522 historystore = metadatastore.unionmetadatastore(
523 *repo.sharedhistorystores)
523 *repo.sharedhistorystores)
524
524
525 missingids = set()
525 missingids = set()
526 if fetchdata:
526 if fetchdata:
527 missingids.update(datastore.getmissing(idstocheck))
527 missingids.update(datastore.getmissing(idstocheck))
528 if fetchhistory:
528 if fetchhistory:
529 missingids.update(historystore.getmissing(idstocheck))
529 missingids.update(historystore.getmissing(idstocheck))
530
530
531 # partition missing nodes into nullid and not-nullid so we can
531 # partition missing nodes into nullid and not-nullid so we can
532 # warn about this filtering potentially shadowing bugs.
532 # warn about this filtering potentially shadowing bugs.
533 nullids = len([None for unused, id in missingids if id == nullid])
533 nullids = len([None for unused, id in missingids if id == nullid])
534 if nullids:
534 if nullids:
535 missingids = [(f, id) for f, id in missingids if id != nullid]
535 missingids = [(f, id) for f, id in missingids if id != nullid]
536 repo.ui.develwarn(
536 repo.ui.develwarn(
537 ('remotefilelog not fetching %d null revs'
537 ('remotefilelog not fetching %d null revs'
538 ' - this is likely hiding bugs' % nullids),
538 ' - this is likely hiding bugs' % nullids),
539 config='remotefilelog-ext')
539 config='remotefilelog-ext')
540 if missingids:
540 if missingids:
541 global fetches, fetched, fetchcost
541 global fetches, fetched, fetchcost
542 fetches += 1
542 fetches += 1
543
543
544 # We want to be able to detect excess individual file downloads, so
544 # We want to be able to detect excess individual file downloads, so
545 # let's log that information for debugging.
545 # let's log that information for debugging.
546 if fetches >= 15 and fetches < 18:
546 if fetches >= 15 and fetches < 18:
547 if fetches == 15:
547 if fetches == 15:
548 fetchwarning = self.ui.config('remotefilelog',
548 fetchwarning = self.ui.config('remotefilelog',
549 'fetchwarning')
549 'fetchwarning')
550 if fetchwarning:
550 if fetchwarning:
551 self.ui.warn(fetchwarning + '\n')
551 self.ui.warn(fetchwarning + '\n')
552 self.logstacktrace()
552 self.logstacktrace()
553 missingids = [(file, hex(id)) for file, id in missingids]
553 missingids = [(file, hex(id)) for file, id in missingids]
554 fetched += len(missingids)
554 fetched += len(missingids)
555 start = time.time()
555 start = time.time()
556 missingids = self.request(missingids)
556 missingids = self.request(missingids)
557 if missingids:
557 if missingids:
558 raise error.Abort(_("unable to download %d files") %
558 raise error.Abort(_("unable to download %d files") %
559 len(missingids))
559 len(missingids))
560 fetchcost += time.time() - start
560 fetchcost += time.time() - start
561 self._lfsprefetch(fileids)
561 self._lfsprefetch(fileids)
562
562
563 def _lfsprefetch(self, fileids):
563 def _lfsprefetch(self, fileids):
564 if not _lfsmod or not util.safehasattr(
564 if not _lfsmod or not util.safehasattr(
565 self.repo.svfs, 'lfslocalblobstore'):
565 self.repo.svfs, 'lfslocalblobstore'):
566 return
566 return
567 if not _lfsmod.wrapper.candownload(self.repo):
567 if not _lfsmod.wrapper.candownload(self.repo):
568 return
568 return
569 pointers = []
569 pointers = []
570 store = self.repo.svfs.lfslocalblobstore
570 store = self.repo.svfs.lfslocalblobstore
571 for file, id in fileids:
571 for file, id in fileids:
572 node = bin(id)
572 node = bin(id)
573 rlog = self.repo.file(file)
573 rlog = self.repo.file(file)
574 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
574 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
575 text = rlog.revision(node, raw=True)
575 text = rlog.revision(node, raw=True)
576 p = _lfsmod.pointer.deserialize(text)
576 p = _lfsmod.pointer.deserialize(text)
577 oid = p.oid()
577 oid = p.oid()
578 if not store.has(oid):
578 if not store.has(oid):
579 pointers.append(p)
579 pointers.append(p)
580 if len(pointers) > 0:
580 if len(pointers) > 0:
581 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
581 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
582 assert all(store.has(p.oid()) for p in pointers)
582 assert all(store.has(p.oid()) for p in pointers)
583
583
584 def logstacktrace(self):
584 def logstacktrace(self):
585 import traceback
585 import traceback
586 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
586 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
587 ''.join(traceback.format_stack()))
587 ''.join(traceback.format_stack()))
General Comments 0
You need to be logged in to leave comments. Login now