##// END OF EJS Templates
remotefilelog: avoid temporarily using "count" variable as synonym for "total"...
Martin von Zweigbergk -
r40882:b34b1b86 default
parent child Browse files
Show More
@@ -1,589 +1,588
1 # fileserverclient.py - client for communicating with the cache process
1 # fileserverclient.py - client for communicating with the cache process
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import io
11 import io
12 import os
12 import os
13 import threading
13 import threading
14 import time
14 import time
15 import zlib
15 import zlib
16
16
17 from mercurial.i18n import _
17 from mercurial.i18n import _
18 from mercurial.node import bin, hex, nullid
18 from mercurial.node import bin, hex, nullid
19 from mercurial import (
19 from mercurial import (
20 error,
20 error,
21 node,
21 node,
22 pycompat,
22 pycompat,
23 revlog,
23 revlog,
24 sshpeer,
24 sshpeer,
25 util,
25 util,
26 wireprotov1peer,
26 wireprotov1peer,
27 )
27 )
28 from mercurial.utils import procutil
28 from mercurial.utils import procutil
29
29
30 from . import (
30 from . import (
31 constants,
31 constants,
32 contentstore,
32 contentstore,
33 metadatastore,
33 metadatastore,
34 )
34 )
35
35
36 _sshv1peer = sshpeer.sshv1peer
36 _sshv1peer = sshpeer.sshv1peer
37
37
38 # Statistics for debugging
38 # Statistics for debugging
39 fetchcost = 0
39 fetchcost = 0
40 fetches = 0
40 fetches = 0
41 fetched = 0
41 fetched = 0
42 fetchmisses = 0
42 fetchmisses = 0
43
43
44 _lfsmod = None
44 _lfsmod = None
45
45
46 def getcachekey(reponame, file, id):
46 def getcachekey(reponame, file, id):
47 pathhash = node.hex(hashlib.sha1(file).digest())
47 pathhash = node.hex(hashlib.sha1(file).digest())
48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
49
49
50 def getlocalkey(file, id):
50 def getlocalkey(file, id):
51 pathhash = node.hex(hashlib.sha1(file).digest())
51 pathhash = node.hex(hashlib.sha1(file).digest())
52 return os.path.join(pathhash, id)
52 return os.path.join(pathhash, id)
53
53
54 def peersetup(ui, peer):
54 def peersetup(ui, peer):
55
55
56 class remotefilepeer(peer.__class__):
56 class remotefilepeer(peer.__class__):
57 @wireprotov1peer.batchable
57 @wireprotov1peer.batchable
58 def x_rfl_getfile(self, file, node):
58 def x_rfl_getfile(self, file, node):
59 if not self.capable('x_rfl_getfile'):
59 if not self.capable('x_rfl_getfile'):
60 raise error.Abort(
60 raise error.Abort(
61 'configured remotefile server does not support getfile')
61 'configured remotefile server does not support getfile')
62 f = wireprotov1peer.future()
62 f = wireprotov1peer.future()
63 yield {'file': file, 'node': node}, f
63 yield {'file': file, 'node': node}, f
64 code, data = f.value.split('\0', 1)
64 code, data = f.value.split('\0', 1)
65 if int(code):
65 if int(code):
66 raise error.LookupError(file, node, data)
66 raise error.LookupError(file, node, data)
67 yield data
67 yield data
68
68
69 @wireprotov1peer.batchable
69 @wireprotov1peer.batchable
70 def x_rfl_getflogheads(self, path):
70 def x_rfl_getflogheads(self, path):
71 if not self.capable('x_rfl_getflogheads'):
71 if not self.capable('x_rfl_getflogheads'):
72 raise error.Abort('configured remotefile server does not '
72 raise error.Abort('configured remotefile server does not '
73 'support getflogheads')
73 'support getflogheads')
74 f = wireprotov1peer.future()
74 f = wireprotov1peer.future()
75 yield {'path': path}, f
75 yield {'path': path}, f
76 heads = f.value.split('\n') if f.value else []
76 heads = f.value.split('\n') if f.value else []
77 yield heads
77 yield heads
78
78
79 def _updatecallstreamopts(self, command, opts):
79 def _updatecallstreamopts(self, command, opts):
80 if command != 'getbundle':
80 if command != 'getbundle':
81 return
81 return
82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
83 not in self.capabilities()):
83 not in self.capabilities()):
84 return
84 return
85 if not util.safehasattr(self, '_localrepo'):
85 if not util.safehasattr(self, '_localrepo'):
86 return
86 return
87 if (constants.SHALLOWREPO_REQUIREMENT
87 if (constants.SHALLOWREPO_REQUIREMENT
88 not in self._localrepo.requirements):
88 not in self._localrepo.requirements):
89 return
89 return
90
90
91 bundlecaps = opts.get('bundlecaps')
91 bundlecaps = opts.get('bundlecaps')
92 if bundlecaps:
92 if bundlecaps:
93 bundlecaps = [bundlecaps]
93 bundlecaps = [bundlecaps]
94 else:
94 else:
95 bundlecaps = []
95 bundlecaps = []
96
96
97 # shallow, includepattern, and excludepattern are a hacky way of
97 # shallow, includepattern, and excludepattern are a hacky way of
98 # carrying over data from the local repo to this getbundle
98 # carrying over data from the local repo to this getbundle
99 # command. We need to do it this way because bundle1 getbundle
99 # command. We need to do it this way because bundle1 getbundle
100 # doesn't provide any other place we can hook in to manipulate
100 # doesn't provide any other place we can hook in to manipulate
101 # getbundle args before it goes across the wire. Once we get rid
101 # getbundle args before it goes across the wire. Once we get rid
102 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
102 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
103 # do this more cleanly.
103 # do this more cleanly.
104 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
104 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
105 if self._localrepo.includepattern:
105 if self._localrepo.includepattern:
106 patterns = '\0'.join(self._localrepo.includepattern)
106 patterns = '\0'.join(self._localrepo.includepattern)
107 includecap = "includepattern=" + patterns
107 includecap = "includepattern=" + patterns
108 bundlecaps.append(includecap)
108 bundlecaps.append(includecap)
109 if self._localrepo.excludepattern:
109 if self._localrepo.excludepattern:
110 patterns = '\0'.join(self._localrepo.excludepattern)
110 patterns = '\0'.join(self._localrepo.excludepattern)
111 excludecap = "excludepattern=" + patterns
111 excludecap = "excludepattern=" + patterns
112 bundlecaps.append(excludecap)
112 bundlecaps.append(excludecap)
113 opts['bundlecaps'] = ','.join(bundlecaps)
113 opts['bundlecaps'] = ','.join(bundlecaps)
114
114
115 def _sendrequest(self, command, args, **opts):
115 def _sendrequest(self, command, args, **opts):
116 self._updatecallstreamopts(command, args)
116 self._updatecallstreamopts(command, args)
117 return super(remotefilepeer, self)._sendrequest(command, args,
117 return super(remotefilepeer, self)._sendrequest(command, args,
118 **opts)
118 **opts)
119
119
120 def _callstream(self, command, **opts):
120 def _callstream(self, command, **opts):
121 supertype = super(remotefilepeer, self)
121 supertype = super(remotefilepeer, self)
122 if not util.safehasattr(supertype, '_sendrequest'):
122 if not util.safehasattr(supertype, '_sendrequest'):
123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
124 return super(remotefilepeer, self)._callstream(command, **opts)
124 return super(remotefilepeer, self)._callstream(command, **opts)
125
125
126 peer.__class__ = remotefilepeer
126 peer.__class__ = remotefilepeer
127
127
128 class cacheconnection(object):
128 class cacheconnection(object):
129 """The connection for communicating with the remote cache. Performs
129 """The connection for communicating with the remote cache. Performs
130 gets and sets by communicating with an external process that has the
130 gets and sets by communicating with an external process that has the
131 cache-specific implementation.
131 cache-specific implementation.
132 """
132 """
133 def __init__(self):
133 def __init__(self):
134 self.pipeo = self.pipei = self.pipee = None
134 self.pipeo = self.pipei = self.pipee = None
135 self.subprocess = None
135 self.subprocess = None
136 self.connected = False
136 self.connected = False
137
137
138 def connect(self, cachecommand):
138 def connect(self, cachecommand):
139 if self.pipeo:
139 if self.pipeo:
140 raise error.Abort(_("cache connection already open"))
140 raise error.Abort(_("cache connection already open"))
141 self.pipei, self.pipeo, self.pipee, self.subprocess = \
141 self.pipei, self.pipeo, self.pipee, self.subprocess = \
142 procutil.popen4(cachecommand)
142 procutil.popen4(cachecommand)
143 self.connected = True
143 self.connected = True
144
144
145 def close(self):
145 def close(self):
146 def tryclose(pipe):
146 def tryclose(pipe):
147 try:
147 try:
148 pipe.close()
148 pipe.close()
149 except Exception:
149 except Exception:
150 pass
150 pass
151 if self.connected:
151 if self.connected:
152 try:
152 try:
153 self.pipei.write("exit\n")
153 self.pipei.write("exit\n")
154 except Exception:
154 except Exception:
155 pass
155 pass
156 tryclose(self.pipei)
156 tryclose(self.pipei)
157 self.pipei = None
157 self.pipei = None
158 tryclose(self.pipeo)
158 tryclose(self.pipeo)
159 self.pipeo = None
159 self.pipeo = None
160 tryclose(self.pipee)
160 tryclose(self.pipee)
161 self.pipee = None
161 self.pipee = None
162 try:
162 try:
163 # Wait for process to terminate, making sure to avoid deadlock.
163 # Wait for process to terminate, making sure to avoid deadlock.
164 # See https://docs.python.org/2/library/subprocess.html for
164 # See https://docs.python.org/2/library/subprocess.html for
165 # warnings about wait() and deadlocking.
165 # warnings about wait() and deadlocking.
166 self.subprocess.communicate()
166 self.subprocess.communicate()
167 except Exception:
167 except Exception:
168 pass
168 pass
169 self.subprocess = None
169 self.subprocess = None
170 self.connected = False
170 self.connected = False
171
171
172 def request(self, request, flush=True):
172 def request(self, request, flush=True):
173 if self.connected:
173 if self.connected:
174 try:
174 try:
175 self.pipei.write(request)
175 self.pipei.write(request)
176 if flush:
176 if flush:
177 self.pipei.flush()
177 self.pipei.flush()
178 except IOError:
178 except IOError:
179 self.close()
179 self.close()
180
180
181 def receiveline(self):
181 def receiveline(self):
182 if not self.connected:
182 if not self.connected:
183 return None
183 return None
184 try:
184 try:
185 result = self.pipeo.readline()[:-1]
185 result = self.pipeo.readline()[:-1]
186 if not result:
186 if not result:
187 self.close()
187 self.close()
188 except IOError:
188 except IOError:
189 self.close()
189 self.close()
190
190
191 return result
191 return result
192
192
193 def _getfilesbatch(
193 def _getfilesbatch(
194 remote, receivemissing, progresstick, missed, idmap, batchsize):
194 remote, receivemissing, progresstick, missed, idmap, batchsize):
195 # Over http(s), iterbatch is a streamy method and we can start
195 # Over http(s), iterbatch is a streamy method and we can start
196 # looking at results early. This means we send one (potentially
196 # looking at results early. This means we send one (potentially
197 # large) request, but then we show nice progress as we process
197 # large) request, but then we show nice progress as we process
198 # file results, rather than showing chunks of $batchsize in
198 # file results, rather than showing chunks of $batchsize in
199 # progress.
199 # progress.
200 #
200 #
201 # Over ssh, iterbatch isn't streamy because batch() wasn't
201 # Over ssh, iterbatch isn't streamy because batch() wasn't
202 # explicitly designed as a streaming method. In the future we
202 # explicitly designed as a streaming method. In the future we
203 # should probably introduce a streambatch() method upstream and
203 # should probably introduce a streambatch() method upstream and
204 # use that for this.
204 # use that for this.
205 with remote.commandexecutor() as e:
205 with remote.commandexecutor() as e:
206 futures = []
206 futures = []
207 for m in missed:
207 for m in missed:
208 futures.append(e.callcommand('x_rfl_getfile', {
208 futures.append(e.callcommand('x_rfl_getfile', {
209 'file': idmap[m],
209 'file': idmap[m],
210 'node': m[-40:]
210 'node': m[-40:]
211 }))
211 }))
212
212
213 for i, m in enumerate(missed):
213 for i, m in enumerate(missed):
214 r = futures[i].result()
214 r = futures[i].result()
215 futures[i] = None # release memory
215 futures[i] = None # release memory
216 file_ = idmap[m]
216 file_ = idmap[m]
217 node = m[-40:]
217 node = m[-40:]
218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
219 progresstick()
219 progresstick()
220
220
221 def _getfiles_optimistic(
221 def _getfiles_optimistic(
222 remote, receivemissing, progresstick, missed, idmap, step):
222 remote, receivemissing, progresstick, missed, idmap, step):
223 remote._callstream("x_rfl_getfiles")
223 remote._callstream("x_rfl_getfiles")
224 i = 0
224 i = 0
225 pipeo = remote._pipeo
225 pipeo = remote._pipeo
226 pipei = remote._pipei
226 pipei = remote._pipei
227 while i < len(missed):
227 while i < len(missed):
228 # issue a batch of requests
228 # issue a batch of requests
229 start = i
229 start = i
230 end = min(len(missed), start + step)
230 end = min(len(missed), start + step)
231 i = end
231 i = end
232 for missingid in missed[start:end]:
232 for missingid in missed[start:end]:
233 # issue new request
233 # issue new request
234 versionid = missingid[-40:]
234 versionid = missingid[-40:]
235 file = idmap[missingid]
235 file = idmap[missingid]
236 sshrequest = "%s%s\n" % (versionid, file)
236 sshrequest = "%s%s\n" % (versionid, file)
237 pipeo.write(sshrequest)
237 pipeo.write(sshrequest)
238 pipeo.flush()
238 pipeo.flush()
239
239
240 # receive batch results
240 # receive batch results
241 for missingid in missed[start:end]:
241 for missingid in missed[start:end]:
242 versionid = missingid[-40:]
242 versionid = missingid[-40:]
243 file = idmap[missingid]
243 file = idmap[missingid]
244 receivemissing(pipei, file, versionid)
244 receivemissing(pipei, file, versionid)
245 progresstick()
245 progresstick()
246
246
247 # End the command
247 # End the command
248 pipeo.write('\n')
248 pipeo.write('\n')
249 pipeo.flush()
249 pipeo.flush()
250
250
251 def _getfiles_threaded(
251 def _getfiles_threaded(
252 remote, receivemissing, progresstick, missed, idmap, step):
252 remote, receivemissing, progresstick, missed, idmap, step):
253 remote._callstream("getfiles")
253 remote._callstream("getfiles")
254 pipeo = remote._pipeo
254 pipeo = remote._pipeo
255 pipei = remote._pipei
255 pipei = remote._pipei
256
256
257 def writer():
257 def writer():
258 for missingid in missed:
258 for missingid in missed:
259 versionid = missingid[-40:]
259 versionid = missingid[-40:]
260 file = idmap[missingid]
260 file = idmap[missingid]
261 sshrequest = "%s%s\n" % (versionid, file)
261 sshrequest = "%s%s\n" % (versionid, file)
262 pipeo.write(sshrequest)
262 pipeo.write(sshrequest)
263 pipeo.flush()
263 pipeo.flush()
264 writerthread = threading.Thread(target=writer)
264 writerthread = threading.Thread(target=writer)
265 writerthread.daemon = True
265 writerthread.daemon = True
266 writerthread.start()
266 writerthread.start()
267
267
268 for missingid in missed:
268 for missingid in missed:
269 versionid = missingid[-40:]
269 versionid = missingid[-40:]
270 file = idmap[missingid]
270 file = idmap[missingid]
271 receivemissing(pipei, file, versionid)
271 receivemissing(pipei, file, versionid)
272 progresstick()
272 progresstick()
273
273
274 writerthread.join()
274 writerthread.join()
275 # End the command
275 # End the command
276 pipeo.write('\n')
276 pipeo.write('\n')
277 pipeo.flush()
277 pipeo.flush()
278
278
279 class fileserverclient(object):
279 class fileserverclient(object):
280 """A client for requesting files from the remote file server.
280 """A client for requesting files from the remote file server.
281 """
281 """
282 def __init__(self, repo):
282 def __init__(self, repo):
283 ui = repo.ui
283 ui = repo.ui
284 self.repo = repo
284 self.repo = repo
285 self.ui = ui
285 self.ui = ui
286 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
286 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
287 if self.cacheprocess:
287 if self.cacheprocess:
288 self.cacheprocess = util.expandpath(self.cacheprocess)
288 self.cacheprocess = util.expandpath(self.cacheprocess)
289
289
290 # This option causes remotefilelog to pass the full file path to the
290 # This option causes remotefilelog to pass the full file path to the
291 # cacheprocess instead of a hashed key.
291 # cacheprocess instead of a hashed key.
292 self.cacheprocesspasspath = ui.configbool(
292 self.cacheprocesspasspath = ui.configbool(
293 "remotefilelog", "cacheprocess.includepath")
293 "remotefilelog", "cacheprocess.includepath")
294
294
295 self.debugoutput = ui.configbool("remotefilelog", "debug")
295 self.debugoutput = ui.configbool("remotefilelog", "debug")
296
296
297 self.remotecache = cacheconnection()
297 self.remotecache = cacheconnection()
298
298
299 def setstore(self, datastore, historystore, writedata, writehistory):
299 def setstore(self, datastore, historystore, writedata, writehistory):
300 self.datastore = datastore
300 self.datastore = datastore
301 self.historystore = historystore
301 self.historystore = historystore
302 self.writedata = writedata
302 self.writedata = writedata
303 self.writehistory = writehistory
303 self.writehistory = writehistory
304
304
305 def _connect(self):
305 def _connect(self):
306 return self.repo.connectionpool.get(self.repo.fallbackpath)
306 return self.repo.connectionpool.get(self.repo.fallbackpath)
307
307
308 def request(self, fileids):
308 def request(self, fileids):
309 """Takes a list of filename/node pairs and fetches them from the
309 """Takes a list of filename/node pairs and fetches them from the
310 server. Files are stored in the local cache.
310 server. Files are stored in the local cache.
311 A list of nodes that the server couldn't find is returned.
311 A list of nodes that the server couldn't find is returned.
312 If the connection fails, an exception is raised.
312 If the connection fails, an exception is raised.
313 """
313 """
314 if not self.remotecache.connected:
314 if not self.remotecache.connected:
315 self.connect()
315 self.connect()
316 cache = self.remotecache
316 cache = self.remotecache
317 writedata = self.writedata
317 writedata = self.writedata
318
318
319 repo = self.repo
319 repo = self.repo
320 count = len(fileids)
320 total = len(fileids)
321 request = "get\n%d\n" % count
321 request = "get\n%d\n" % total
322 idmap = {}
322 idmap = {}
323 reponame = repo.name
323 reponame = repo.name
324 for file, id in fileids:
324 for file, id in fileids:
325 fullid = getcachekey(reponame, file, id)
325 fullid = getcachekey(reponame, file, id)
326 if self.cacheprocesspasspath:
326 if self.cacheprocesspasspath:
327 request += file + '\0'
327 request += file + '\0'
328 request += fullid + "\n"
328 request += fullid + "\n"
329 idmap[fullid] = file
329 idmap[fullid] = file
330
330
331 cache.request(request)
331 cache.request(request)
332
332
333 total = count
333 progress = self.ui.makeprogress(_('downloading'), total=total)
334 progress = self.ui.makeprogress(_('downloading'), total=count)
335 progress.update(0)
334 progress.update(0)
336
335
337 missed = []
336 missed = []
338 count = 0
337 count = 0
339 while True:
338 while True:
340 missingid = cache.receiveline()
339 missingid = cache.receiveline()
341 if not missingid:
340 if not missingid:
342 missedset = set(missed)
341 missedset = set(missed)
343 for missingid in idmap:
342 for missingid in idmap:
344 if not missingid in missedset:
343 if not missingid in missedset:
345 missed.append(missingid)
344 missed.append(missingid)
346 self.ui.warn(_("warning: cache connection closed early - " +
345 self.ui.warn(_("warning: cache connection closed early - " +
347 "falling back to server\n"))
346 "falling back to server\n"))
348 break
347 break
349 if missingid == "0":
348 if missingid == "0":
350 break
349 break
351 if missingid.startswith("_hits_"):
350 if missingid.startswith("_hits_"):
352 # receive progress reports
351 # receive progress reports
353 parts = missingid.split("_")
352 parts = missingid.split("_")
354 count += int(parts[2])
353 count += int(parts[2])
355 progress.update(count)
354 progress.update(count)
356 continue
355 continue
357
356
358 missed.append(missingid)
357 missed.append(missingid)
359
358
360 global fetchmisses
359 global fetchmisses
361 fetchmisses += len(missed)
360 fetchmisses += len(missed)
362
361
363 count = [total - len(missed)]
362 count = [total - len(missed)]
364 fromcache = count[0]
363 fromcache = count[0]
365 progress.update(count[0], total=total)
364 progress.update(count[0], total=total)
366 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
365 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
367 count[0], total, hit=count[0], total=total)
366 count[0], total, hit=count[0], total=total)
368
367
369 oldumask = os.umask(0o002)
368 oldumask = os.umask(0o002)
370 try:
369 try:
371 # receive cache misses from master
370 # receive cache misses from master
372 if missed:
371 if missed:
373 def progresstick():
372 def progresstick():
374 count[0] += 1
373 count[0] += 1
375 progress.update(count[0])
374 progress.update(count[0])
376 # When verbose is true, sshpeer prints 'running ssh...'
375 # When verbose is true, sshpeer prints 'running ssh...'
377 # to stdout, which can interfere with some command
376 # to stdout, which can interfere with some command
378 # outputs
377 # outputs
379 verbose = self.ui.verbose
378 verbose = self.ui.verbose
380 self.ui.verbose = False
379 self.ui.verbose = False
381 try:
380 try:
382 with self._connect() as conn:
381 with self._connect() as conn:
383 remote = conn.peer
382 remote = conn.peer
384 if remote.capable(
383 if remote.capable(
385 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
384 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
386 if not isinstance(remote, _sshv1peer):
385 if not isinstance(remote, _sshv1peer):
387 raise error.Abort('remotefilelog requires ssh '
386 raise error.Abort('remotefilelog requires ssh '
388 'servers')
387 'servers')
389 step = self.ui.configint('remotefilelog',
388 step = self.ui.configint('remotefilelog',
390 'getfilesstep')
389 'getfilesstep')
391 getfilestype = self.ui.config('remotefilelog',
390 getfilestype = self.ui.config('remotefilelog',
392 'getfilestype')
391 'getfilestype')
393 if getfilestype == 'threaded':
392 if getfilestype == 'threaded':
394 _getfiles = _getfiles_threaded
393 _getfiles = _getfiles_threaded
395 else:
394 else:
396 _getfiles = _getfiles_optimistic
395 _getfiles = _getfiles_optimistic
397 _getfiles(remote, self.receivemissing, progresstick,
396 _getfiles(remote, self.receivemissing, progresstick,
398 missed, idmap, step)
397 missed, idmap, step)
399 elif remote.capable("x_rfl_getfile"):
398 elif remote.capable("x_rfl_getfile"):
400 if remote.capable('batch'):
399 if remote.capable('batch'):
401 batchdefault = 100
400 batchdefault = 100
402 else:
401 else:
403 batchdefault = 10
402 batchdefault = 10
404 batchsize = self.ui.configint(
403 batchsize = self.ui.configint(
405 'remotefilelog', 'batchsize', batchdefault)
404 'remotefilelog', 'batchsize', batchdefault)
406 _getfilesbatch(
405 _getfilesbatch(
407 remote, self.receivemissing, progresstick,
406 remote, self.receivemissing, progresstick,
408 missed, idmap, batchsize)
407 missed, idmap, batchsize)
409 else:
408 else:
410 raise error.Abort("configured remotefilelog server"
409 raise error.Abort("configured remotefilelog server"
411 " does not support remotefilelog")
410 " does not support remotefilelog")
412
411
413 self.ui.log("remotefilefetchlog",
412 self.ui.log("remotefilefetchlog",
414 "Success\n",
413 "Success\n",
415 fetched_files = count[0] - fromcache,
414 fetched_files = count[0] - fromcache,
416 total_to_fetch = total - fromcache)
415 total_to_fetch = total - fromcache)
417 except Exception:
416 except Exception:
418 self.ui.log("remotefilefetchlog",
417 self.ui.log("remotefilefetchlog",
419 "Fail\n",
418 "Fail\n",
420 fetched_files = count[0] - fromcache,
419 fetched_files = count[0] - fromcache,
421 total_to_fetch = total - fromcache)
420 total_to_fetch = total - fromcache)
422 raise
421 raise
423 finally:
422 finally:
424 self.ui.verbose = verbose
423 self.ui.verbose = verbose
425 # send to memcache
424 # send to memcache
426 count[0] = len(missed)
425 count[0] = len(missed)
427 request = "set\n%d\n%s\n" % (count[0], "\n".join(missed))
426 request = "set\n%d\n%s\n" % (count[0], "\n".join(missed))
428 cache.request(request)
427 cache.request(request)
429
428
430 progress.complete()
429 progress.complete()
431
430
432 # mark ourselves as a user of this cache
431 # mark ourselves as a user of this cache
433 writedata.markrepo(self.repo.path)
432 writedata.markrepo(self.repo.path)
434 finally:
433 finally:
435 os.umask(oldumask)
434 os.umask(oldumask)
436
435
437 def receivemissing(self, pipe, filename, node):
436 def receivemissing(self, pipe, filename, node):
438 line = pipe.readline()[:-1]
437 line = pipe.readline()[:-1]
439 if not line:
438 if not line:
440 raise error.ResponseError(_("error downloading file contents:"),
439 raise error.ResponseError(_("error downloading file contents:"),
441 _("connection closed early"))
440 _("connection closed early"))
442 size = int(line)
441 size = int(line)
443 data = pipe.read(size)
442 data = pipe.read(size)
444 if len(data) != size:
443 if len(data) != size:
445 raise error.ResponseError(_("error downloading file contents:"),
444 raise error.ResponseError(_("error downloading file contents:"),
446 _("only received %s of %s bytes")
445 _("only received %s of %s bytes")
447 % (len(data), size))
446 % (len(data), size))
448
447
449 self.writedata.addremotefilelognode(filename, bin(node),
448 self.writedata.addremotefilelognode(filename, bin(node),
450 zlib.decompress(data))
449 zlib.decompress(data))
451
450
452 def connect(self):
451 def connect(self):
453 if self.cacheprocess:
452 if self.cacheprocess:
454 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
453 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
455 self.remotecache.connect(cmd)
454 self.remotecache.connect(cmd)
456 else:
455 else:
457 # If no cache process is specified, we fake one that always
456 # If no cache process is specified, we fake one that always
458 # returns cache misses. This enables tests to run easily
457 # returns cache misses. This enables tests to run easily
459 # and may eventually allow us to be a drop in replacement
458 # and may eventually allow us to be a drop in replacement
460 # for the largefiles extension.
459 # for the largefiles extension.
461 class simplecache(object):
460 class simplecache(object):
462 def __init__(self):
461 def __init__(self):
463 self.missingids = []
462 self.missingids = []
464 self.connected = True
463 self.connected = True
465
464
466 def close(self):
465 def close(self):
467 pass
466 pass
468
467
469 def request(self, value, flush=True):
468 def request(self, value, flush=True):
470 lines = value.split("\n")
469 lines = value.split("\n")
471 if lines[0] != "get":
470 if lines[0] != "get":
472 return
471 return
473 self.missingids = lines[2:-1]
472 self.missingids = lines[2:-1]
474 self.missingids.append('0')
473 self.missingids.append('0')
475
474
476 def receiveline(self):
475 def receiveline(self):
477 if len(self.missingids) > 0:
476 if len(self.missingids) > 0:
478 return self.missingids.pop(0)
477 return self.missingids.pop(0)
479 return None
478 return None
480
479
481 self.remotecache = simplecache()
480 self.remotecache = simplecache()
482
481
483 def close(self):
482 def close(self):
484 if fetches:
483 if fetches:
485 msg = ("%d files fetched over %d fetches - " +
484 msg = ("%d files fetched over %d fetches - " +
486 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
485 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
487 fetched,
486 fetched,
488 fetches,
487 fetches,
489 fetchmisses,
488 fetchmisses,
490 float(fetched - fetchmisses) / float(fetched) * 100.0,
489 float(fetched - fetchmisses) / float(fetched) * 100.0,
491 fetchcost)
490 fetchcost)
492 if self.debugoutput:
491 if self.debugoutput:
493 self.ui.warn(msg)
492 self.ui.warn(msg)
494 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
493 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
495 remotefilelogfetched=fetched,
494 remotefilelogfetched=fetched,
496 remotefilelogfetches=fetches,
495 remotefilelogfetches=fetches,
497 remotefilelogfetchmisses=fetchmisses,
496 remotefilelogfetchmisses=fetchmisses,
498 remotefilelogfetchtime=fetchcost * 1000)
497 remotefilelogfetchtime=fetchcost * 1000)
499
498
500 if self.remotecache.connected:
499 if self.remotecache.connected:
501 self.remotecache.close()
500 self.remotecache.close()
502
501
503 def prefetch(self, fileids, force=False, fetchdata=True,
502 def prefetch(self, fileids, force=False, fetchdata=True,
504 fetchhistory=False):
503 fetchhistory=False):
505 """downloads the given file versions to the cache
504 """downloads the given file versions to the cache
506 """
505 """
507 repo = self.repo
506 repo = self.repo
508 idstocheck = []
507 idstocheck = []
509 for file, id in fileids:
508 for file, id in fileids:
510 # hack
509 # hack
511 # - we don't use .hgtags
510 # - we don't use .hgtags
512 # - workingctx produces ids with length 42,
511 # - workingctx produces ids with length 42,
513 # which we skip since they aren't in any cache
512 # which we skip since they aren't in any cache
514 if (file == '.hgtags' or len(id) == 42
513 if (file == '.hgtags' or len(id) == 42
515 or not repo.shallowmatch(file)):
514 or not repo.shallowmatch(file)):
516 continue
515 continue
517
516
518 idstocheck.append((file, bin(id)))
517 idstocheck.append((file, bin(id)))
519
518
520 datastore = self.datastore
519 datastore = self.datastore
521 historystore = self.historystore
520 historystore = self.historystore
522 if force:
521 if force:
523 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
522 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
524 historystore = metadatastore.unionmetadatastore(
523 historystore = metadatastore.unionmetadatastore(
525 *repo.sharedhistorystores)
524 *repo.sharedhistorystores)
526
525
527 missingids = set()
526 missingids = set()
528 if fetchdata:
527 if fetchdata:
529 missingids.update(datastore.getmissing(idstocheck))
528 missingids.update(datastore.getmissing(idstocheck))
530 if fetchhistory:
529 if fetchhistory:
531 missingids.update(historystore.getmissing(idstocheck))
530 missingids.update(historystore.getmissing(idstocheck))
532
531
533 # partition missing nodes into nullid and not-nullid so we can
532 # partition missing nodes into nullid and not-nullid so we can
534 # warn about this filtering potentially shadowing bugs.
533 # warn about this filtering potentially shadowing bugs.
535 nullids = len([None for unused, id in missingids if id == nullid])
534 nullids = len([None for unused, id in missingids if id == nullid])
536 if nullids:
535 if nullids:
537 missingids = [(f, id) for f, id in missingids if id != nullid]
536 missingids = [(f, id) for f, id in missingids if id != nullid]
538 repo.ui.develwarn(
537 repo.ui.develwarn(
539 ('remotefilelog not fetching %d null revs'
538 ('remotefilelog not fetching %d null revs'
540 ' - this is likely hiding bugs' % nullids),
539 ' - this is likely hiding bugs' % nullids),
541 config='remotefilelog-ext')
540 config='remotefilelog-ext')
542 if missingids:
541 if missingids:
543 global fetches, fetched, fetchcost
542 global fetches, fetched, fetchcost
544 fetches += 1
543 fetches += 1
545
544
546 # We want to be able to detect excess individual file downloads, so
545 # We want to be able to detect excess individual file downloads, so
547 # let's log that information for debugging.
546 # let's log that information for debugging.
548 if fetches >= 15 and fetches < 18:
547 if fetches >= 15 and fetches < 18:
549 if fetches == 15:
548 if fetches == 15:
550 fetchwarning = self.ui.config('remotefilelog',
549 fetchwarning = self.ui.config('remotefilelog',
551 'fetchwarning')
550 'fetchwarning')
552 if fetchwarning:
551 if fetchwarning:
553 self.ui.warn(fetchwarning + '\n')
552 self.ui.warn(fetchwarning + '\n')
554 self.logstacktrace()
553 self.logstacktrace()
555 missingids = [(file, hex(id)) for file, id in missingids]
554 missingids = [(file, hex(id)) for file, id in missingids]
556 fetched += len(missingids)
555 fetched += len(missingids)
557 start = time.time()
556 start = time.time()
558 missingids = self.request(missingids)
557 missingids = self.request(missingids)
559 if missingids:
558 if missingids:
560 raise error.Abort(_("unable to download %d files") %
559 raise error.Abort(_("unable to download %d files") %
561 len(missingids))
560 len(missingids))
562 fetchcost += time.time() - start
561 fetchcost += time.time() - start
563 self._lfsprefetch(fileids)
562 self._lfsprefetch(fileids)
564
563
565 def _lfsprefetch(self, fileids):
564 def _lfsprefetch(self, fileids):
566 if not _lfsmod or not util.safehasattr(
565 if not _lfsmod or not util.safehasattr(
567 self.repo.svfs, 'lfslocalblobstore'):
566 self.repo.svfs, 'lfslocalblobstore'):
568 return
567 return
569 if not _lfsmod.wrapper.candownload(self.repo):
568 if not _lfsmod.wrapper.candownload(self.repo):
570 return
569 return
571 pointers = []
570 pointers = []
572 store = self.repo.svfs.lfslocalblobstore
571 store = self.repo.svfs.lfslocalblobstore
573 for file, id in fileids:
572 for file, id in fileids:
574 node = bin(id)
573 node = bin(id)
575 rlog = self.repo.file(file)
574 rlog = self.repo.file(file)
576 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
575 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
577 text = rlog.revision(node, raw=True)
576 text = rlog.revision(node, raw=True)
578 p = _lfsmod.pointer.deserialize(text)
577 p = _lfsmod.pointer.deserialize(text)
579 oid = p.oid()
578 oid = p.oid()
580 if not store.has(oid):
579 if not store.has(oid):
581 pointers.append(p)
580 pointers.append(p)
582 if len(pointers) > 0:
581 if len(pointers) > 0:
583 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
582 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
584 assert all(store.has(p.oid()) for p in pointers)
583 assert all(store.has(p.oid()) for p in pointers)
585
584
586 def logstacktrace(self):
585 def logstacktrace(self):
587 import traceback
586 import traceback
588 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
587 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
589 ''.join(traceback.format_stack()))
588 ''.join(traceback.format_stack()))
General Comments 0
You need to be logged in to leave comments. Login now