##// END OF EJS Templates
remotefilelog: remove an unnecessary update of "count" container...
Martin von Zweigbergk -
r40883:70de33b9 default
parent child Browse files
Show More
@@ -1,588 +1,587 b''
1 # fileserverclient.py - client for communicating with the cache process
1 # fileserverclient.py - client for communicating with the cache process
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import io
11 import io
12 import os
12 import os
13 import threading
13 import threading
14 import time
14 import time
15 import zlib
15 import zlib
16
16
17 from mercurial.i18n import _
17 from mercurial.i18n import _
18 from mercurial.node import bin, hex, nullid
18 from mercurial.node import bin, hex, nullid
19 from mercurial import (
19 from mercurial import (
20 error,
20 error,
21 node,
21 node,
22 pycompat,
22 pycompat,
23 revlog,
23 revlog,
24 sshpeer,
24 sshpeer,
25 util,
25 util,
26 wireprotov1peer,
26 wireprotov1peer,
27 )
27 )
28 from mercurial.utils import procutil
28 from mercurial.utils import procutil
29
29
30 from . import (
30 from . import (
31 constants,
31 constants,
32 contentstore,
32 contentstore,
33 metadatastore,
33 metadatastore,
34 )
34 )
35
35
36 _sshv1peer = sshpeer.sshv1peer
36 _sshv1peer = sshpeer.sshv1peer
37
37
38 # Statistics for debugging
38 # Statistics for debugging
39 fetchcost = 0
39 fetchcost = 0
40 fetches = 0
40 fetches = 0
41 fetched = 0
41 fetched = 0
42 fetchmisses = 0
42 fetchmisses = 0
43
43
44 _lfsmod = None
44 _lfsmod = None
45
45
46 def getcachekey(reponame, file, id):
46 def getcachekey(reponame, file, id):
47 pathhash = node.hex(hashlib.sha1(file).digest())
47 pathhash = node.hex(hashlib.sha1(file).digest())
48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
49
49
50 def getlocalkey(file, id):
50 def getlocalkey(file, id):
51 pathhash = node.hex(hashlib.sha1(file).digest())
51 pathhash = node.hex(hashlib.sha1(file).digest())
52 return os.path.join(pathhash, id)
52 return os.path.join(pathhash, id)
53
53
54 def peersetup(ui, peer):
54 def peersetup(ui, peer):
55
55
56 class remotefilepeer(peer.__class__):
56 class remotefilepeer(peer.__class__):
57 @wireprotov1peer.batchable
57 @wireprotov1peer.batchable
58 def x_rfl_getfile(self, file, node):
58 def x_rfl_getfile(self, file, node):
59 if not self.capable('x_rfl_getfile'):
59 if not self.capable('x_rfl_getfile'):
60 raise error.Abort(
60 raise error.Abort(
61 'configured remotefile server does not support getfile')
61 'configured remotefile server does not support getfile')
62 f = wireprotov1peer.future()
62 f = wireprotov1peer.future()
63 yield {'file': file, 'node': node}, f
63 yield {'file': file, 'node': node}, f
64 code, data = f.value.split('\0', 1)
64 code, data = f.value.split('\0', 1)
65 if int(code):
65 if int(code):
66 raise error.LookupError(file, node, data)
66 raise error.LookupError(file, node, data)
67 yield data
67 yield data
68
68
69 @wireprotov1peer.batchable
69 @wireprotov1peer.batchable
70 def x_rfl_getflogheads(self, path):
70 def x_rfl_getflogheads(self, path):
71 if not self.capable('x_rfl_getflogheads'):
71 if not self.capable('x_rfl_getflogheads'):
72 raise error.Abort('configured remotefile server does not '
72 raise error.Abort('configured remotefile server does not '
73 'support getflogheads')
73 'support getflogheads')
74 f = wireprotov1peer.future()
74 f = wireprotov1peer.future()
75 yield {'path': path}, f
75 yield {'path': path}, f
76 heads = f.value.split('\n') if f.value else []
76 heads = f.value.split('\n') if f.value else []
77 yield heads
77 yield heads
78
78
79 def _updatecallstreamopts(self, command, opts):
79 def _updatecallstreamopts(self, command, opts):
80 if command != 'getbundle':
80 if command != 'getbundle':
81 return
81 return
82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
83 not in self.capabilities()):
83 not in self.capabilities()):
84 return
84 return
85 if not util.safehasattr(self, '_localrepo'):
85 if not util.safehasattr(self, '_localrepo'):
86 return
86 return
87 if (constants.SHALLOWREPO_REQUIREMENT
87 if (constants.SHALLOWREPO_REQUIREMENT
88 not in self._localrepo.requirements):
88 not in self._localrepo.requirements):
89 return
89 return
90
90
91 bundlecaps = opts.get('bundlecaps')
91 bundlecaps = opts.get('bundlecaps')
92 if bundlecaps:
92 if bundlecaps:
93 bundlecaps = [bundlecaps]
93 bundlecaps = [bundlecaps]
94 else:
94 else:
95 bundlecaps = []
95 bundlecaps = []
96
96
97 # shallow, includepattern, and excludepattern are a hacky way of
97 # shallow, includepattern, and excludepattern are a hacky way of
98 # carrying over data from the local repo to this getbundle
98 # carrying over data from the local repo to this getbundle
99 # command. We need to do it this way because bundle1 getbundle
99 # command. We need to do it this way because bundle1 getbundle
100 # doesn't provide any other place we can hook in to manipulate
100 # doesn't provide any other place we can hook in to manipulate
101 # getbundle args before it goes across the wire. Once we get rid
101 # getbundle args before it goes across the wire. Once we get rid
102 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
102 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
103 # do this more cleanly.
103 # do this more cleanly.
104 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
104 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
105 if self._localrepo.includepattern:
105 if self._localrepo.includepattern:
106 patterns = '\0'.join(self._localrepo.includepattern)
106 patterns = '\0'.join(self._localrepo.includepattern)
107 includecap = "includepattern=" + patterns
107 includecap = "includepattern=" + patterns
108 bundlecaps.append(includecap)
108 bundlecaps.append(includecap)
109 if self._localrepo.excludepattern:
109 if self._localrepo.excludepattern:
110 patterns = '\0'.join(self._localrepo.excludepattern)
110 patterns = '\0'.join(self._localrepo.excludepattern)
111 excludecap = "excludepattern=" + patterns
111 excludecap = "excludepattern=" + patterns
112 bundlecaps.append(excludecap)
112 bundlecaps.append(excludecap)
113 opts['bundlecaps'] = ','.join(bundlecaps)
113 opts['bundlecaps'] = ','.join(bundlecaps)
114
114
115 def _sendrequest(self, command, args, **opts):
115 def _sendrequest(self, command, args, **opts):
116 self._updatecallstreamopts(command, args)
116 self._updatecallstreamopts(command, args)
117 return super(remotefilepeer, self)._sendrequest(command, args,
117 return super(remotefilepeer, self)._sendrequest(command, args,
118 **opts)
118 **opts)
119
119
120 def _callstream(self, command, **opts):
120 def _callstream(self, command, **opts):
121 supertype = super(remotefilepeer, self)
121 supertype = super(remotefilepeer, self)
122 if not util.safehasattr(supertype, '_sendrequest'):
122 if not util.safehasattr(supertype, '_sendrequest'):
123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
124 return super(remotefilepeer, self)._callstream(command, **opts)
124 return super(remotefilepeer, self)._callstream(command, **opts)
125
125
126 peer.__class__ = remotefilepeer
126 peer.__class__ = remotefilepeer
127
127
128 class cacheconnection(object):
128 class cacheconnection(object):
129 """The connection for communicating with the remote cache. Performs
129 """The connection for communicating with the remote cache. Performs
130 gets and sets by communicating with an external process that has the
130 gets and sets by communicating with an external process that has the
131 cache-specific implementation.
131 cache-specific implementation.
132 """
132 """
133 def __init__(self):
133 def __init__(self):
134 self.pipeo = self.pipei = self.pipee = None
134 self.pipeo = self.pipei = self.pipee = None
135 self.subprocess = None
135 self.subprocess = None
136 self.connected = False
136 self.connected = False
137
137
138 def connect(self, cachecommand):
138 def connect(self, cachecommand):
139 if self.pipeo:
139 if self.pipeo:
140 raise error.Abort(_("cache connection already open"))
140 raise error.Abort(_("cache connection already open"))
141 self.pipei, self.pipeo, self.pipee, self.subprocess = \
141 self.pipei, self.pipeo, self.pipee, self.subprocess = \
142 procutil.popen4(cachecommand)
142 procutil.popen4(cachecommand)
143 self.connected = True
143 self.connected = True
144
144
145 def close(self):
145 def close(self):
146 def tryclose(pipe):
146 def tryclose(pipe):
147 try:
147 try:
148 pipe.close()
148 pipe.close()
149 except Exception:
149 except Exception:
150 pass
150 pass
151 if self.connected:
151 if self.connected:
152 try:
152 try:
153 self.pipei.write("exit\n")
153 self.pipei.write("exit\n")
154 except Exception:
154 except Exception:
155 pass
155 pass
156 tryclose(self.pipei)
156 tryclose(self.pipei)
157 self.pipei = None
157 self.pipei = None
158 tryclose(self.pipeo)
158 tryclose(self.pipeo)
159 self.pipeo = None
159 self.pipeo = None
160 tryclose(self.pipee)
160 tryclose(self.pipee)
161 self.pipee = None
161 self.pipee = None
162 try:
162 try:
163 # Wait for process to terminate, making sure to avoid deadlock.
163 # Wait for process to terminate, making sure to avoid deadlock.
164 # See https://docs.python.org/2/library/subprocess.html for
164 # See https://docs.python.org/2/library/subprocess.html for
165 # warnings about wait() and deadlocking.
165 # warnings about wait() and deadlocking.
166 self.subprocess.communicate()
166 self.subprocess.communicate()
167 except Exception:
167 except Exception:
168 pass
168 pass
169 self.subprocess = None
169 self.subprocess = None
170 self.connected = False
170 self.connected = False
171
171
172 def request(self, request, flush=True):
172 def request(self, request, flush=True):
173 if self.connected:
173 if self.connected:
174 try:
174 try:
175 self.pipei.write(request)
175 self.pipei.write(request)
176 if flush:
176 if flush:
177 self.pipei.flush()
177 self.pipei.flush()
178 except IOError:
178 except IOError:
179 self.close()
179 self.close()
180
180
181 def receiveline(self):
181 def receiveline(self):
182 if not self.connected:
182 if not self.connected:
183 return None
183 return None
184 try:
184 try:
185 result = self.pipeo.readline()[:-1]
185 result = self.pipeo.readline()[:-1]
186 if not result:
186 if not result:
187 self.close()
187 self.close()
188 except IOError:
188 except IOError:
189 self.close()
189 self.close()
190
190
191 return result
191 return result
192
192
193 def _getfilesbatch(
193 def _getfilesbatch(
194 remote, receivemissing, progresstick, missed, idmap, batchsize):
194 remote, receivemissing, progresstick, missed, idmap, batchsize):
195 # Over http(s), iterbatch is a streamy method and we can start
195 # Over http(s), iterbatch is a streamy method and we can start
196 # looking at results early. This means we send one (potentially
196 # looking at results early. This means we send one (potentially
197 # large) request, but then we show nice progress as we process
197 # large) request, but then we show nice progress as we process
198 # file results, rather than showing chunks of $batchsize in
198 # file results, rather than showing chunks of $batchsize in
199 # progress.
199 # progress.
200 #
200 #
201 # Over ssh, iterbatch isn't streamy because batch() wasn't
201 # Over ssh, iterbatch isn't streamy because batch() wasn't
202 # explicitly designed as a streaming method. In the future we
202 # explicitly designed as a streaming method. In the future we
203 # should probably introduce a streambatch() method upstream and
203 # should probably introduce a streambatch() method upstream and
204 # use that for this.
204 # use that for this.
205 with remote.commandexecutor() as e:
205 with remote.commandexecutor() as e:
206 futures = []
206 futures = []
207 for m in missed:
207 for m in missed:
208 futures.append(e.callcommand('x_rfl_getfile', {
208 futures.append(e.callcommand('x_rfl_getfile', {
209 'file': idmap[m],
209 'file': idmap[m],
210 'node': m[-40:]
210 'node': m[-40:]
211 }))
211 }))
212
212
213 for i, m in enumerate(missed):
213 for i, m in enumerate(missed):
214 r = futures[i].result()
214 r = futures[i].result()
215 futures[i] = None # release memory
215 futures[i] = None # release memory
216 file_ = idmap[m]
216 file_ = idmap[m]
217 node = m[-40:]
217 node = m[-40:]
218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
219 progresstick()
219 progresstick()
220
220
221 def _getfiles_optimistic(
221 def _getfiles_optimistic(
222 remote, receivemissing, progresstick, missed, idmap, step):
222 remote, receivemissing, progresstick, missed, idmap, step):
223 remote._callstream("x_rfl_getfiles")
223 remote._callstream("x_rfl_getfiles")
224 i = 0
224 i = 0
225 pipeo = remote._pipeo
225 pipeo = remote._pipeo
226 pipei = remote._pipei
226 pipei = remote._pipei
227 while i < len(missed):
227 while i < len(missed):
228 # issue a batch of requests
228 # issue a batch of requests
229 start = i
229 start = i
230 end = min(len(missed), start + step)
230 end = min(len(missed), start + step)
231 i = end
231 i = end
232 for missingid in missed[start:end]:
232 for missingid in missed[start:end]:
233 # issue new request
233 # issue new request
234 versionid = missingid[-40:]
234 versionid = missingid[-40:]
235 file = idmap[missingid]
235 file = idmap[missingid]
236 sshrequest = "%s%s\n" % (versionid, file)
236 sshrequest = "%s%s\n" % (versionid, file)
237 pipeo.write(sshrequest)
237 pipeo.write(sshrequest)
238 pipeo.flush()
238 pipeo.flush()
239
239
240 # receive batch results
240 # receive batch results
241 for missingid in missed[start:end]:
241 for missingid in missed[start:end]:
242 versionid = missingid[-40:]
242 versionid = missingid[-40:]
243 file = idmap[missingid]
243 file = idmap[missingid]
244 receivemissing(pipei, file, versionid)
244 receivemissing(pipei, file, versionid)
245 progresstick()
245 progresstick()
246
246
247 # End the command
247 # End the command
248 pipeo.write('\n')
248 pipeo.write('\n')
249 pipeo.flush()
249 pipeo.flush()
250
250
251 def _getfiles_threaded(
251 def _getfiles_threaded(
252 remote, receivemissing, progresstick, missed, idmap, step):
252 remote, receivemissing, progresstick, missed, idmap, step):
253 remote._callstream("getfiles")
253 remote._callstream("getfiles")
254 pipeo = remote._pipeo
254 pipeo = remote._pipeo
255 pipei = remote._pipei
255 pipei = remote._pipei
256
256
257 def writer():
257 def writer():
258 for missingid in missed:
258 for missingid in missed:
259 versionid = missingid[-40:]
259 versionid = missingid[-40:]
260 file = idmap[missingid]
260 file = idmap[missingid]
261 sshrequest = "%s%s\n" % (versionid, file)
261 sshrequest = "%s%s\n" % (versionid, file)
262 pipeo.write(sshrequest)
262 pipeo.write(sshrequest)
263 pipeo.flush()
263 pipeo.flush()
264 writerthread = threading.Thread(target=writer)
264 writerthread = threading.Thread(target=writer)
265 writerthread.daemon = True
265 writerthread.daemon = True
266 writerthread.start()
266 writerthread.start()
267
267
268 for missingid in missed:
268 for missingid in missed:
269 versionid = missingid[-40:]
269 versionid = missingid[-40:]
270 file = idmap[missingid]
270 file = idmap[missingid]
271 receivemissing(pipei, file, versionid)
271 receivemissing(pipei, file, versionid)
272 progresstick()
272 progresstick()
273
273
274 writerthread.join()
274 writerthread.join()
275 # End the command
275 # End the command
276 pipeo.write('\n')
276 pipeo.write('\n')
277 pipeo.flush()
277 pipeo.flush()
278
278
279 class fileserverclient(object):
279 class fileserverclient(object):
280 """A client for requesting files from the remote file server.
280 """A client for requesting files from the remote file server.
281 """
281 """
282 def __init__(self, repo):
282 def __init__(self, repo):
283 ui = repo.ui
283 ui = repo.ui
284 self.repo = repo
284 self.repo = repo
285 self.ui = ui
285 self.ui = ui
286 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
286 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
287 if self.cacheprocess:
287 if self.cacheprocess:
288 self.cacheprocess = util.expandpath(self.cacheprocess)
288 self.cacheprocess = util.expandpath(self.cacheprocess)
289
289
290 # This option causes remotefilelog to pass the full file path to the
290 # This option causes remotefilelog to pass the full file path to the
291 # cacheprocess instead of a hashed key.
291 # cacheprocess instead of a hashed key.
292 self.cacheprocesspasspath = ui.configbool(
292 self.cacheprocesspasspath = ui.configbool(
293 "remotefilelog", "cacheprocess.includepath")
293 "remotefilelog", "cacheprocess.includepath")
294
294
295 self.debugoutput = ui.configbool("remotefilelog", "debug")
295 self.debugoutput = ui.configbool("remotefilelog", "debug")
296
296
297 self.remotecache = cacheconnection()
297 self.remotecache = cacheconnection()
298
298
299 def setstore(self, datastore, historystore, writedata, writehistory):
299 def setstore(self, datastore, historystore, writedata, writehistory):
300 self.datastore = datastore
300 self.datastore = datastore
301 self.historystore = historystore
301 self.historystore = historystore
302 self.writedata = writedata
302 self.writedata = writedata
303 self.writehistory = writehistory
303 self.writehistory = writehistory
304
304
305 def _connect(self):
305 def _connect(self):
306 return self.repo.connectionpool.get(self.repo.fallbackpath)
306 return self.repo.connectionpool.get(self.repo.fallbackpath)
307
307
308 def request(self, fileids):
308 def request(self, fileids):
309 """Takes a list of filename/node pairs and fetches them from the
309 """Takes a list of filename/node pairs and fetches them from the
310 server. Files are stored in the local cache.
310 server. Files are stored in the local cache.
311 A list of nodes that the server couldn't find is returned.
311 A list of nodes that the server couldn't find is returned.
312 If the connection fails, an exception is raised.
312 If the connection fails, an exception is raised.
313 """
313 """
314 if not self.remotecache.connected:
314 if not self.remotecache.connected:
315 self.connect()
315 self.connect()
316 cache = self.remotecache
316 cache = self.remotecache
317 writedata = self.writedata
317 writedata = self.writedata
318
318
319 repo = self.repo
319 repo = self.repo
320 total = len(fileids)
320 total = len(fileids)
321 request = "get\n%d\n" % total
321 request = "get\n%d\n" % total
322 idmap = {}
322 idmap = {}
323 reponame = repo.name
323 reponame = repo.name
324 for file, id in fileids:
324 for file, id in fileids:
325 fullid = getcachekey(reponame, file, id)
325 fullid = getcachekey(reponame, file, id)
326 if self.cacheprocesspasspath:
326 if self.cacheprocesspasspath:
327 request += file + '\0'
327 request += file + '\0'
328 request += fullid + "\n"
328 request += fullid + "\n"
329 idmap[fullid] = file
329 idmap[fullid] = file
330
330
331 cache.request(request)
331 cache.request(request)
332
332
333 progress = self.ui.makeprogress(_('downloading'), total=total)
333 progress = self.ui.makeprogress(_('downloading'), total=total)
334 progress.update(0)
334 progress.update(0)
335
335
336 missed = []
336 missed = []
337 count = 0
337 count = 0
338 while True:
338 while True:
339 missingid = cache.receiveline()
339 missingid = cache.receiveline()
340 if not missingid:
340 if not missingid:
341 missedset = set(missed)
341 missedset = set(missed)
342 for missingid in idmap:
342 for missingid in idmap:
343 if not missingid in missedset:
343 if not missingid in missedset:
344 missed.append(missingid)
344 missed.append(missingid)
345 self.ui.warn(_("warning: cache connection closed early - " +
345 self.ui.warn(_("warning: cache connection closed early - " +
346 "falling back to server\n"))
346 "falling back to server\n"))
347 break
347 break
348 if missingid == "0":
348 if missingid == "0":
349 break
349 break
350 if missingid.startswith("_hits_"):
350 if missingid.startswith("_hits_"):
351 # receive progress reports
351 # receive progress reports
352 parts = missingid.split("_")
352 parts = missingid.split("_")
353 count += int(parts[2])
353 count += int(parts[2])
354 progress.update(count)
354 progress.update(count)
355 continue
355 continue
356
356
357 missed.append(missingid)
357 missed.append(missingid)
358
358
359 global fetchmisses
359 global fetchmisses
360 fetchmisses += len(missed)
360 fetchmisses += len(missed)
361
361
362 count = [total - len(missed)]
362 count = [total - len(missed)]
363 fromcache = count[0]
363 fromcache = count[0]
364 progress.update(count[0], total=total)
364 progress.update(count[0], total=total)
365 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
365 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
366 count[0], total, hit=count[0], total=total)
366 count[0], total, hit=count[0], total=total)
367
367
368 oldumask = os.umask(0o002)
368 oldumask = os.umask(0o002)
369 try:
369 try:
370 # receive cache misses from master
370 # receive cache misses from master
371 if missed:
371 if missed:
372 def progresstick():
372 def progresstick():
373 count[0] += 1
373 count[0] += 1
374 progress.update(count[0])
374 progress.update(count[0])
375 # When verbose is true, sshpeer prints 'running ssh...'
375 # When verbose is true, sshpeer prints 'running ssh...'
376 # to stdout, which can interfere with some command
376 # to stdout, which can interfere with some command
377 # outputs
377 # outputs
378 verbose = self.ui.verbose
378 verbose = self.ui.verbose
379 self.ui.verbose = False
379 self.ui.verbose = False
380 try:
380 try:
381 with self._connect() as conn:
381 with self._connect() as conn:
382 remote = conn.peer
382 remote = conn.peer
383 if remote.capable(
383 if remote.capable(
384 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
384 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
385 if not isinstance(remote, _sshv1peer):
385 if not isinstance(remote, _sshv1peer):
386 raise error.Abort('remotefilelog requires ssh '
386 raise error.Abort('remotefilelog requires ssh '
387 'servers')
387 'servers')
388 step = self.ui.configint('remotefilelog',
388 step = self.ui.configint('remotefilelog',
389 'getfilesstep')
389 'getfilesstep')
390 getfilestype = self.ui.config('remotefilelog',
390 getfilestype = self.ui.config('remotefilelog',
391 'getfilestype')
391 'getfilestype')
392 if getfilestype == 'threaded':
392 if getfilestype == 'threaded':
393 _getfiles = _getfiles_threaded
393 _getfiles = _getfiles_threaded
394 else:
394 else:
395 _getfiles = _getfiles_optimistic
395 _getfiles = _getfiles_optimistic
396 _getfiles(remote, self.receivemissing, progresstick,
396 _getfiles(remote, self.receivemissing, progresstick,
397 missed, idmap, step)
397 missed, idmap, step)
398 elif remote.capable("x_rfl_getfile"):
398 elif remote.capable("x_rfl_getfile"):
399 if remote.capable('batch'):
399 if remote.capable('batch'):
400 batchdefault = 100
400 batchdefault = 100
401 else:
401 else:
402 batchdefault = 10
402 batchdefault = 10
403 batchsize = self.ui.configint(
403 batchsize = self.ui.configint(
404 'remotefilelog', 'batchsize', batchdefault)
404 'remotefilelog', 'batchsize', batchdefault)
405 _getfilesbatch(
405 _getfilesbatch(
406 remote, self.receivemissing, progresstick,
406 remote, self.receivemissing, progresstick,
407 missed, idmap, batchsize)
407 missed, idmap, batchsize)
408 else:
408 else:
409 raise error.Abort("configured remotefilelog server"
409 raise error.Abort("configured remotefilelog server"
410 " does not support remotefilelog")
410 " does not support remotefilelog")
411
411
412 self.ui.log("remotefilefetchlog",
412 self.ui.log("remotefilefetchlog",
413 "Success\n",
413 "Success\n",
414 fetched_files = count[0] - fromcache,
414 fetched_files = count[0] - fromcache,
415 total_to_fetch = total - fromcache)
415 total_to_fetch = total - fromcache)
416 except Exception:
416 except Exception:
417 self.ui.log("remotefilefetchlog",
417 self.ui.log("remotefilefetchlog",
418 "Fail\n",
418 "Fail\n",
419 fetched_files = count[0] - fromcache,
419 fetched_files = count[0] - fromcache,
420 total_to_fetch = total - fromcache)
420 total_to_fetch = total - fromcache)
421 raise
421 raise
422 finally:
422 finally:
423 self.ui.verbose = verbose
423 self.ui.verbose = verbose
424 # send to memcache
424 # send to memcache
425 count[0] = len(missed)
425 request = "set\n%d\n%s\n" % (len(missed), "\n".join(missed))
426 request = "set\n%d\n%s\n" % (count[0], "\n".join(missed))
427 cache.request(request)
426 cache.request(request)
428
427
429 progress.complete()
428 progress.complete()
430
429
431 # mark ourselves as a user of this cache
430 # mark ourselves as a user of this cache
432 writedata.markrepo(self.repo.path)
431 writedata.markrepo(self.repo.path)
433 finally:
432 finally:
434 os.umask(oldumask)
433 os.umask(oldumask)
435
434
436 def receivemissing(self, pipe, filename, node):
435 def receivemissing(self, pipe, filename, node):
437 line = pipe.readline()[:-1]
436 line = pipe.readline()[:-1]
438 if not line:
437 if not line:
439 raise error.ResponseError(_("error downloading file contents:"),
438 raise error.ResponseError(_("error downloading file contents:"),
440 _("connection closed early"))
439 _("connection closed early"))
441 size = int(line)
440 size = int(line)
442 data = pipe.read(size)
441 data = pipe.read(size)
443 if len(data) != size:
442 if len(data) != size:
444 raise error.ResponseError(_("error downloading file contents:"),
443 raise error.ResponseError(_("error downloading file contents:"),
445 _("only received %s of %s bytes")
444 _("only received %s of %s bytes")
446 % (len(data), size))
445 % (len(data), size))
447
446
448 self.writedata.addremotefilelognode(filename, bin(node),
447 self.writedata.addremotefilelognode(filename, bin(node),
449 zlib.decompress(data))
448 zlib.decompress(data))
450
449
451 def connect(self):
450 def connect(self):
452 if self.cacheprocess:
451 if self.cacheprocess:
453 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
452 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
454 self.remotecache.connect(cmd)
453 self.remotecache.connect(cmd)
455 else:
454 else:
456 # If no cache process is specified, we fake one that always
455 # If no cache process is specified, we fake one that always
457 # returns cache misses. This enables tests to run easily
456 # returns cache misses. This enables tests to run easily
458 # and may eventually allow us to be a drop in replacement
457 # and may eventually allow us to be a drop in replacement
459 # for the largefiles extension.
458 # for the largefiles extension.
460 class simplecache(object):
459 class simplecache(object):
461 def __init__(self):
460 def __init__(self):
462 self.missingids = []
461 self.missingids = []
463 self.connected = True
462 self.connected = True
464
463
465 def close(self):
464 def close(self):
466 pass
465 pass
467
466
468 def request(self, value, flush=True):
467 def request(self, value, flush=True):
469 lines = value.split("\n")
468 lines = value.split("\n")
470 if lines[0] != "get":
469 if lines[0] != "get":
471 return
470 return
472 self.missingids = lines[2:-1]
471 self.missingids = lines[2:-1]
473 self.missingids.append('0')
472 self.missingids.append('0')
474
473
475 def receiveline(self):
474 def receiveline(self):
476 if len(self.missingids) > 0:
475 if len(self.missingids) > 0:
477 return self.missingids.pop(0)
476 return self.missingids.pop(0)
478 return None
477 return None
479
478
480 self.remotecache = simplecache()
479 self.remotecache = simplecache()
481
480
482 def close(self):
481 def close(self):
483 if fetches:
482 if fetches:
484 msg = ("%d files fetched over %d fetches - " +
483 msg = ("%d files fetched over %d fetches - " +
485 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
484 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
486 fetched,
485 fetched,
487 fetches,
486 fetches,
488 fetchmisses,
487 fetchmisses,
489 float(fetched - fetchmisses) / float(fetched) * 100.0,
488 float(fetched - fetchmisses) / float(fetched) * 100.0,
490 fetchcost)
489 fetchcost)
491 if self.debugoutput:
490 if self.debugoutput:
492 self.ui.warn(msg)
491 self.ui.warn(msg)
493 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
492 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
494 remotefilelogfetched=fetched,
493 remotefilelogfetched=fetched,
495 remotefilelogfetches=fetches,
494 remotefilelogfetches=fetches,
496 remotefilelogfetchmisses=fetchmisses,
495 remotefilelogfetchmisses=fetchmisses,
497 remotefilelogfetchtime=fetchcost * 1000)
496 remotefilelogfetchtime=fetchcost * 1000)
498
497
499 if self.remotecache.connected:
498 if self.remotecache.connected:
500 self.remotecache.close()
499 self.remotecache.close()
501
500
502 def prefetch(self, fileids, force=False, fetchdata=True,
501 def prefetch(self, fileids, force=False, fetchdata=True,
503 fetchhistory=False):
502 fetchhistory=False):
504 """downloads the given file versions to the cache
503 """downloads the given file versions to the cache
505 """
504 """
506 repo = self.repo
505 repo = self.repo
507 idstocheck = []
506 idstocheck = []
508 for file, id in fileids:
507 for file, id in fileids:
509 # hack
508 # hack
510 # - we don't use .hgtags
509 # - we don't use .hgtags
511 # - workingctx produces ids with length 42,
510 # - workingctx produces ids with length 42,
512 # which we skip since they aren't in any cache
511 # which we skip since they aren't in any cache
513 if (file == '.hgtags' or len(id) == 42
512 if (file == '.hgtags' or len(id) == 42
514 or not repo.shallowmatch(file)):
513 or not repo.shallowmatch(file)):
515 continue
514 continue
516
515
517 idstocheck.append((file, bin(id)))
516 idstocheck.append((file, bin(id)))
518
517
519 datastore = self.datastore
518 datastore = self.datastore
520 historystore = self.historystore
519 historystore = self.historystore
521 if force:
520 if force:
522 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
521 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
523 historystore = metadatastore.unionmetadatastore(
522 historystore = metadatastore.unionmetadatastore(
524 *repo.sharedhistorystores)
523 *repo.sharedhistorystores)
525
524
526 missingids = set()
525 missingids = set()
527 if fetchdata:
526 if fetchdata:
528 missingids.update(datastore.getmissing(idstocheck))
527 missingids.update(datastore.getmissing(idstocheck))
529 if fetchhistory:
528 if fetchhistory:
530 missingids.update(historystore.getmissing(idstocheck))
529 missingids.update(historystore.getmissing(idstocheck))
531
530
532 # partition missing nodes into nullid and not-nullid so we can
531 # partition missing nodes into nullid and not-nullid so we can
533 # warn about this filtering potentially shadowing bugs.
532 # warn about this filtering potentially shadowing bugs.
534 nullids = len([None for unused, id in missingids if id == nullid])
533 nullids = len([None for unused, id in missingids if id == nullid])
535 if nullids:
534 if nullids:
536 missingids = [(f, id) for f, id in missingids if id != nullid]
535 missingids = [(f, id) for f, id in missingids if id != nullid]
537 repo.ui.develwarn(
536 repo.ui.develwarn(
538 ('remotefilelog not fetching %d null revs'
537 ('remotefilelog not fetching %d null revs'
539 ' - this is likely hiding bugs' % nullids),
538 ' - this is likely hiding bugs' % nullids),
540 config='remotefilelog-ext')
539 config='remotefilelog-ext')
541 if missingids:
540 if missingids:
542 global fetches, fetched, fetchcost
541 global fetches, fetched, fetchcost
543 fetches += 1
542 fetches += 1
544
543
545 # We want to be able to detect excess individual file downloads, so
544 # We want to be able to detect excess individual file downloads, so
546 # let's log that information for debugging.
545 # let's log that information for debugging.
547 if fetches >= 15 and fetches < 18:
546 if fetches >= 15 and fetches < 18:
548 if fetches == 15:
547 if fetches == 15:
549 fetchwarning = self.ui.config('remotefilelog',
548 fetchwarning = self.ui.config('remotefilelog',
550 'fetchwarning')
549 'fetchwarning')
551 if fetchwarning:
550 if fetchwarning:
552 self.ui.warn(fetchwarning + '\n')
551 self.ui.warn(fetchwarning + '\n')
553 self.logstacktrace()
552 self.logstacktrace()
554 missingids = [(file, hex(id)) for file, id in missingids]
553 missingids = [(file, hex(id)) for file, id in missingids]
555 fetched += len(missingids)
554 fetched += len(missingids)
556 start = time.time()
555 start = time.time()
557 missingids = self.request(missingids)
556 missingids = self.request(missingids)
558 if missingids:
557 if missingids:
559 raise error.Abort(_("unable to download %d files") %
558 raise error.Abort(_("unable to download %d files") %
560 len(missingids))
559 len(missingids))
561 fetchcost += time.time() - start
560 fetchcost += time.time() - start
562 self._lfsprefetch(fileids)
561 self._lfsprefetch(fileids)
563
562
564 def _lfsprefetch(self, fileids):
563 def _lfsprefetch(self, fileids):
565 if not _lfsmod or not util.safehasattr(
564 if not _lfsmod or not util.safehasattr(
566 self.repo.svfs, 'lfslocalblobstore'):
565 self.repo.svfs, 'lfslocalblobstore'):
567 return
566 return
568 if not _lfsmod.wrapper.candownload(self.repo):
567 if not _lfsmod.wrapper.candownload(self.repo):
569 return
568 return
570 pointers = []
569 pointers = []
571 store = self.repo.svfs.lfslocalblobstore
570 store = self.repo.svfs.lfslocalblobstore
572 for file, id in fileids:
571 for file, id in fileids:
573 node = bin(id)
572 node = bin(id)
574 rlog = self.repo.file(file)
573 rlog = self.repo.file(file)
575 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
574 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
576 text = rlog.revision(node, raw=True)
575 text = rlog.revision(node, raw=True)
577 p = _lfsmod.pointer.deserialize(text)
576 p = _lfsmod.pointer.deserialize(text)
578 oid = p.oid()
577 oid = p.oid()
579 if not store.has(oid):
578 if not store.has(oid):
580 pointers.append(p)
579 pointers.append(p)
581 if len(pointers) > 0:
580 if len(pointers) > 0:
582 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
581 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
583 assert all(store.has(p.oid()) for p in pointers)
582 assert all(store.has(p.oid()) for p in pointers)
584
583
585 def logstacktrace(self):
584 def logstacktrace(self):
586 import traceback
585 import traceback
587 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
586 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
588 ''.join(traceback.format_stack()))
587 ''.join(traceback.format_stack()))
General Comments 0
You need to be logged in to leave comments. Login now