##// END OF EJS Templates
remotefilelog: use progress helper in fileserverclient...
Martin von Zweigbergk -
r40881:e58cd7ed default
parent child Browse files
Show More
@@ -1,589 +1,589 b''
1 # fileserverclient.py - client for communicating with the cache process
1 # fileserverclient.py - client for communicating with the cache process
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import io
11 import io
12 import os
12 import os
13 import threading
13 import threading
14 import time
14 import time
15 import zlib
15 import zlib
16
16
17 from mercurial.i18n import _
17 from mercurial.i18n import _
18 from mercurial.node import bin, hex, nullid
18 from mercurial.node import bin, hex, nullid
19 from mercurial import (
19 from mercurial import (
20 error,
20 error,
21 node,
21 node,
22 pycompat,
22 pycompat,
23 revlog,
23 revlog,
24 sshpeer,
24 sshpeer,
25 util,
25 util,
26 wireprotov1peer,
26 wireprotov1peer,
27 )
27 )
28 from mercurial.utils import procutil
28 from mercurial.utils import procutil
29
29
30 from . import (
30 from . import (
31 constants,
31 constants,
32 contentstore,
32 contentstore,
33 metadatastore,
33 metadatastore,
34 )
34 )
35
35
36 _sshv1peer = sshpeer.sshv1peer
36 _sshv1peer = sshpeer.sshv1peer
37
37
38 # Statistics for debugging
38 # Statistics for debugging
39 fetchcost = 0
39 fetchcost = 0
40 fetches = 0
40 fetches = 0
41 fetched = 0
41 fetched = 0
42 fetchmisses = 0
42 fetchmisses = 0
43
43
44 _lfsmod = None
44 _lfsmod = None
45 _downloading = _('downloading')
46
45
47 def getcachekey(reponame, file, id):
46 def getcachekey(reponame, file, id):
48 pathhash = node.hex(hashlib.sha1(file).digest())
47 pathhash = node.hex(hashlib.sha1(file).digest())
49 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
50
49
51 def getlocalkey(file, id):
50 def getlocalkey(file, id):
52 pathhash = node.hex(hashlib.sha1(file).digest())
51 pathhash = node.hex(hashlib.sha1(file).digest())
53 return os.path.join(pathhash, id)
52 return os.path.join(pathhash, id)
54
53
55 def peersetup(ui, peer):
54 def peersetup(ui, peer):
56
55
57 class remotefilepeer(peer.__class__):
56 class remotefilepeer(peer.__class__):
58 @wireprotov1peer.batchable
57 @wireprotov1peer.batchable
59 def x_rfl_getfile(self, file, node):
58 def x_rfl_getfile(self, file, node):
60 if not self.capable('x_rfl_getfile'):
59 if not self.capable('x_rfl_getfile'):
61 raise error.Abort(
60 raise error.Abort(
62 'configured remotefile server does not support getfile')
61 'configured remotefile server does not support getfile')
63 f = wireprotov1peer.future()
62 f = wireprotov1peer.future()
64 yield {'file': file, 'node': node}, f
63 yield {'file': file, 'node': node}, f
65 code, data = f.value.split('\0', 1)
64 code, data = f.value.split('\0', 1)
66 if int(code):
65 if int(code):
67 raise error.LookupError(file, node, data)
66 raise error.LookupError(file, node, data)
68 yield data
67 yield data
69
68
70 @wireprotov1peer.batchable
69 @wireprotov1peer.batchable
71 def x_rfl_getflogheads(self, path):
70 def x_rfl_getflogheads(self, path):
72 if not self.capable('x_rfl_getflogheads'):
71 if not self.capable('x_rfl_getflogheads'):
73 raise error.Abort('configured remotefile server does not '
72 raise error.Abort('configured remotefile server does not '
74 'support getflogheads')
73 'support getflogheads')
75 f = wireprotov1peer.future()
74 f = wireprotov1peer.future()
76 yield {'path': path}, f
75 yield {'path': path}, f
77 heads = f.value.split('\n') if f.value else []
76 heads = f.value.split('\n') if f.value else []
78 yield heads
77 yield heads
79
78
80 def _updatecallstreamopts(self, command, opts):
79 def _updatecallstreamopts(self, command, opts):
81 if command != 'getbundle':
80 if command != 'getbundle':
82 return
81 return
83 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
84 not in self.capabilities()):
83 not in self.capabilities()):
85 return
84 return
86 if not util.safehasattr(self, '_localrepo'):
85 if not util.safehasattr(self, '_localrepo'):
87 return
86 return
88 if (constants.SHALLOWREPO_REQUIREMENT
87 if (constants.SHALLOWREPO_REQUIREMENT
89 not in self._localrepo.requirements):
88 not in self._localrepo.requirements):
90 return
89 return
91
90
92 bundlecaps = opts.get('bundlecaps')
91 bundlecaps = opts.get('bundlecaps')
93 if bundlecaps:
92 if bundlecaps:
94 bundlecaps = [bundlecaps]
93 bundlecaps = [bundlecaps]
95 else:
94 else:
96 bundlecaps = []
95 bundlecaps = []
97
96
98 # shallow, includepattern, and excludepattern are a hacky way of
97 # shallow, includepattern, and excludepattern are a hacky way of
99 # carrying over data from the local repo to this getbundle
98 # carrying over data from the local repo to this getbundle
100 # command. We need to do it this way because bundle1 getbundle
99 # command. We need to do it this way because bundle1 getbundle
101 # doesn't provide any other place we can hook in to manipulate
100 # doesn't provide any other place we can hook in to manipulate
102 # getbundle args before it goes across the wire. Once we get rid
101 # getbundle args before it goes across the wire. Once we get rid
103 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
102 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
104 # do this more cleanly.
103 # do this more cleanly.
105 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
104 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
106 if self._localrepo.includepattern:
105 if self._localrepo.includepattern:
107 patterns = '\0'.join(self._localrepo.includepattern)
106 patterns = '\0'.join(self._localrepo.includepattern)
108 includecap = "includepattern=" + patterns
107 includecap = "includepattern=" + patterns
109 bundlecaps.append(includecap)
108 bundlecaps.append(includecap)
110 if self._localrepo.excludepattern:
109 if self._localrepo.excludepattern:
111 patterns = '\0'.join(self._localrepo.excludepattern)
110 patterns = '\0'.join(self._localrepo.excludepattern)
112 excludecap = "excludepattern=" + patterns
111 excludecap = "excludepattern=" + patterns
113 bundlecaps.append(excludecap)
112 bundlecaps.append(excludecap)
114 opts['bundlecaps'] = ','.join(bundlecaps)
113 opts['bundlecaps'] = ','.join(bundlecaps)
115
114
116 def _sendrequest(self, command, args, **opts):
115 def _sendrequest(self, command, args, **opts):
117 self._updatecallstreamopts(command, args)
116 self._updatecallstreamopts(command, args)
118 return super(remotefilepeer, self)._sendrequest(command, args,
117 return super(remotefilepeer, self)._sendrequest(command, args,
119 **opts)
118 **opts)
120
119
121 def _callstream(self, command, **opts):
120 def _callstream(self, command, **opts):
122 supertype = super(remotefilepeer, self)
121 supertype = super(remotefilepeer, self)
123 if not util.safehasattr(supertype, '_sendrequest'):
122 if not util.safehasattr(supertype, '_sendrequest'):
124 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
125 return super(remotefilepeer, self)._callstream(command, **opts)
124 return super(remotefilepeer, self)._callstream(command, **opts)
126
125
127 peer.__class__ = remotefilepeer
126 peer.__class__ = remotefilepeer
128
127
129 class cacheconnection(object):
128 class cacheconnection(object):
130 """The connection for communicating with the remote cache. Performs
129 """The connection for communicating with the remote cache. Performs
131 gets and sets by communicating with an external process that has the
130 gets and sets by communicating with an external process that has the
132 cache-specific implementation.
131 cache-specific implementation.
133 """
132 """
134 def __init__(self):
133 def __init__(self):
135 self.pipeo = self.pipei = self.pipee = None
134 self.pipeo = self.pipei = self.pipee = None
136 self.subprocess = None
135 self.subprocess = None
137 self.connected = False
136 self.connected = False
138
137
139 def connect(self, cachecommand):
138 def connect(self, cachecommand):
140 if self.pipeo:
139 if self.pipeo:
141 raise error.Abort(_("cache connection already open"))
140 raise error.Abort(_("cache connection already open"))
142 self.pipei, self.pipeo, self.pipee, self.subprocess = \
141 self.pipei, self.pipeo, self.pipee, self.subprocess = \
143 procutil.popen4(cachecommand)
142 procutil.popen4(cachecommand)
144 self.connected = True
143 self.connected = True
145
144
146 def close(self):
145 def close(self):
147 def tryclose(pipe):
146 def tryclose(pipe):
148 try:
147 try:
149 pipe.close()
148 pipe.close()
150 except Exception:
149 except Exception:
151 pass
150 pass
152 if self.connected:
151 if self.connected:
153 try:
152 try:
154 self.pipei.write("exit\n")
153 self.pipei.write("exit\n")
155 except Exception:
154 except Exception:
156 pass
155 pass
157 tryclose(self.pipei)
156 tryclose(self.pipei)
158 self.pipei = None
157 self.pipei = None
159 tryclose(self.pipeo)
158 tryclose(self.pipeo)
160 self.pipeo = None
159 self.pipeo = None
161 tryclose(self.pipee)
160 tryclose(self.pipee)
162 self.pipee = None
161 self.pipee = None
163 try:
162 try:
164 # Wait for process to terminate, making sure to avoid deadlock.
163 # Wait for process to terminate, making sure to avoid deadlock.
165 # See https://docs.python.org/2/library/subprocess.html for
164 # See https://docs.python.org/2/library/subprocess.html for
166 # warnings about wait() and deadlocking.
165 # warnings about wait() and deadlocking.
167 self.subprocess.communicate()
166 self.subprocess.communicate()
168 except Exception:
167 except Exception:
169 pass
168 pass
170 self.subprocess = None
169 self.subprocess = None
171 self.connected = False
170 self.connected = False
172
171
173 def request(self, request, flush=True):
172 def request(self, request, flush=True):
174 if self.connected:
173 if self.connected:
175 try:
174 try:
176 self.pipei.write(request)
175 self.pipei.write(request)
177 if flush:
176 if flush:
178 self.pipei.flush()
177 self.pipei.flush()
179 except IOError:
178 except IOError:
180 self.close()
179 self.close()
181
180
182 def receiveline(self):
181 def receiveline(self):
183 if not self.connected:
182 if not self.connected:
184 return None
183 return None
185 try:
184 try:
186 result = self.pipeo.readline()[:-1]
185 result = self.pipeo.readline()[:-1]
187 if not result:
186 if not result:
188 self.close()
187 self.close()
189 except IOError:
188 except IOError:
190 self.close()
189 self.close()
191
190
192 return result
191 return result
193
192
194 def _getfilesbatch(
193 def _getfilesbatch(
195 remote, receivemissing, progresstick, missed, idmap, batchsize):
194 remote, receivemissing, progresstick, missed, idmap, batchsize):
196 # Over http(s), iterbatch is a streamy method and we can start
195 # Over http(s), iterbatch is a streamy method and we can start
197 # looking at results early. This means we send one (potentially
196 # looking at results early. This means we send one (potentially
198 # large) request, but then we show nice progress as we process
197 # large) request, but then we show nice progress as we process
199 # file results, rather than showing chunks of $batchsize in
198 # file results, rather than showing chunks of $batchsize in
200 # progress.
199 # progress.
201 #
200 #
202 # Over ssh, iterbatch isn't streamy because batch() wasn't
201 # Over ssh, iterbatch isn't streamy because batch() wasn't
203 # explicitly designed as a streaming method. In the future we
202 # explicitly designed as a streaming method. In the future we
204 # should probably introduce a streambatch() method upstream and
203 # should probably introduce a streambatch() method upstream and
205 # use that for this.
204 # use that for this.
206 with remote.commandexecutor() as e:
205 with remote.commandexecutor() as e:
207 futures = []
206 futures = []
208 for m in missed:
207 for m in missed:
209 futures.append(e.callcommand('x_rfl_getfile', {
208 futures.append(e.callcommand('x_rfl_getfile', {
210 'file': idmap[m],
209 'file': idmap[m],
211 'node': m[-40:]
210 'node': m[-40:]
212 }))
211 }))
213
212
214 for i, m in enumerate(missed):
213 for i, m in enumerate(missed):
215 r = futures[i].result()
214 r = futures[i].result()
216 futures[i] = None # release memory
215 futures[i] = None # release memory
217 file_ = idmap[m]
216 file_ = idmap[m]
218 node = m[-40:]
217 node = m[-40:]
219 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
220 progresstick()
219 progresstick()
221
220
222 def _getfiles_optimistic(
221 def _getfiles_optimistic(
223 remote, receivemissing, progresstick, missed, idmap, step):
222 remote, receivemissing, progresstick, missed, idmap, step):
224 remote._callstream("x_rfl_getfiles")
223 remote._callstream("x_rfl_getfiles")
225 i = 0
224 i = 0
226 pipeo = remote._pipeo
225 pipeo = remote._pipeo
227 pipei = remote._pipei
226 pipei = remote._pipei
228 while i < len(missed):
227 while i < len(missed):
229 # issue a batch of requests
228 # issue a batch of requests
230 start = i
229 start = i
231 end = min(len(missed), start + step)
230 end = min(len(missed), start + step)
232 i = end
231 i = end
233 for missingid in missed[start:end]:
232 for missingid in missed[start:end]:
234 # issue new request
233 # issue new request
235 versionid = missingid[-40:]
234 versionid = missingid[-40:]
236 file = idmap[missingid]
235 file = idmap[missingid]
237 sshrequest = "%s%s\n" % (versionid, file)
236 sshrequest = "%s%s\n" % (versionid, file)
238 pipeo.write(sshrequest)
237 pipeo.write(sshrequest)
239 pipeo.flush()
238 pipeo.flush()
240
239
241 # receive batch results
240 # receive batch results
242 for missingid in missed[start:end]:
241 for missingid in missed[start:end]:
243 versionid = missingid[-40:]
242 versionid = missingid[-40:]
244 file = idmap[missingid]
243 file = idmap[missingid]
245 receivemissing(pipei, file, versionid)
244 receivemissing(pipei, file, versionid)
246 progresstick()
245 progresstick()
247
246
248 # End the command
247 # End the command
249 pipeo.write('\n')
248 pipeo.write('\n')
250 pipeo.flush()
249 pipeo.flush()
251
250
252 def _getfiles_threaded(
251 def _getfiles_threaded(
253 remote, receivemissing, progresstick, missed, idmap, step):
252 remote, receivemissing, progresstick, missed, idmap, step):
254 remote._callstream("getfiles")
253 remote._callstream("getfiles")
255 pipeo = remote._pipeo
254 pipeo = remote._pipeo
256 pipei = remote._pipei
255 pipei = remote._pipei
257
256
258 def writer():
257 def writer():
259 for missingid in missed:
258 for missingid in missed:
260 versionid = missingid[-40:]
259 versionid = missingid[-40:]
261 file = idmap[missingid]
260 file = idmap[missingid]
262 sshrequest = "%s%s\n" % (versionid, file)
261 sshrequest = "%s%s\n" % (versionid, file)
263 pipeo.write(sshrequest)
262 pipeo.write(sshrequest)
264 pipeo.flush()
263 pipeo.flush()
265 writerthread = threading.Thread(target=writer)
264 writerthread = threading.Thread(target=writer)
266 writerthread.daemon = True
265 writerthread.daemon = True
267 writerthread.start()
266 writerthread.start()
268
267
269 for missingid in missed:
268 for missingid in missed:
270 versionid = missingid[-40:]
269 versionid = missingid[-40:]
271 file = idmap[missingid]
270 file = idmap[missingid]
272 receivemissing(pipei, file, versionid)
271 receivemissing(pipei, file, versionid)
273 progresstick()
272 progresstick()
274
273
275 writerthread.join()
274 writerthread.join()
276 # End the command
275 # End the command
277 pipeo.write('\n')
276 pipeo.write('\n')
278 pipeo.flush()
277 pipeo.flush()
279
278
280 class fileserverclient(object):
279 class fileserverclient(object):
281 """A client for requesting files from the remote file server.
280 """A client for requesting files from the remote file server.
282 """
281 """
283 def __init__(self, repo):
282 def __init__(self, repo):
284 ui = repo.ui
283 ui = repo.ui
285 self.repo = repo
284 self.repo = repo
286 self.ui = ui
285 self.ui = ui
287 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
286 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
288 if self.cacheprocess:
287 if self.cacheprocess:
289 self.cacheprocess = util.expandpath(self.cacheprocess)
288 self.cacheprocess = util.expandpath(self.cacheprocess)
290
289
291 # This option causes remotefilelog to pass the full file path to the
290 # This option causes remotefilelog to pass the full file path to the
292 # cacheprocess instead of a hashed key.
291 # cacheprocess instead of a hashed key.
293 self.cacheprocesspasspath = ui.configbool(
292 self.cacheprocesspasspath = ui.configbool(
294 "remotefilelog", "cacheprocess.includepath")
293 "remotefilelog", "cacheprocess.includepath")
295
294
296 self.debugoutput = ui.configbool("remotefilelog", "debug")
295 self.debugoutput = ui.configbool("remotefilelog", "debug")
297
296
298 self.remotecache = cacheconnection()
297 self.remotecache = cacheconnection()
299
298
300 def setstore(self, datastore, historystore, writedata, writehistory):
299 def setstore(self, datastore, historystore, writedata, writehistory):
301 self.datastore = datastore
300 self.datastore = datastore
302 self.historystore = historystore
301 self.historystore = historystore
303 self.writedata = writedata
302 self.writedata = writedata
304 self.writehistory = writehistory
303 self.writehistory = writehistory
305
304
306 def _connect(self):
305 def _connect(self):
307 return self.repo.connectionpool.get(self.repo.fallbackpath)
306 return self.repo.connectionpool.get(self.repo.fallbackpath)
308
307
309 def request(self, fileids):
308 def request(self, fileids):
310 """Takes a list of filename/node pairs and fetches them from the
309 """Takes a list of filename/node pairs and fetches them from the
311 server. Files are stored in the local cache.
310 server. Files are stored in the local cache.
312 A list of nodes that the server couldn't find is returned.
311 A list of nodes that the server couldn't find is returned.
313 If the connection fails, an exception is raised.
312 If the connection fails, an exception is raised.
314 """
313 """
315 if not self.remotecache.connected:
314 if not self.remotecache.connected:
316 self.connect()
315 self.connect()
317 cache = self.remotecache
316 cache = self.remotecache
318 writedata = self.writedata
317 writedata = self.writedata
319
318
320 repo = self.repo
319 repo = self.repo
321 count = len(fileids)
320 count = len(fileids)
322 request = "get\n%d\n" % count
321 request = "get\n%d\n" % count
323 idmap = {}
322 idmap = {}
324 reponame = repo.name
323 reponame = repo.name
325 for file, id in fileids:
324 for file, id in fileids:
326 fullid = getcachekey(reponame, file, id)
325 fullid = getcachekey(reponame, file, id)
327 if self.cacheprocesspasspath:
326 if self.cacheprocesspasspath:
328 request += file + '\0'
327 request += file + '\0'
329 request += fullid + "\n"
328 request += fullid + "\n"
330 idmap[fullid] = file
329 idmap[fullid] = file
331
330
332 cache.request(request)
331 cache.request(request)
333
332
334 total = count
333 total = count
335 self.ui.progress(_downloading, 0, total=count)
334 progress = self.ui.makeprogress(_('downloading'), total=count)
335 progress.update(0)
336
336
337 missed = []
337 missed = []
338 count = 0
338 count = 0
339 while True:
339 while True:
340 missingid = cache.receiveline()
340 missingid = cache.receiveline()
341 if not missingid:
341 if not missingid:
342 missedset = set(missed)
342 missedset = set(missed)
343 for missingid in idmap:
343 for missingid in idmap:
344 if not missingid in missedset:
344 if not missingid in missedset:
345 missed.append(missingid)
345 missed.append(missingid)
346 self.ui.warn(_("warning: cache connection closed early - " +
346 self.ui.warn(_("warning: cache connection closed early - " +
347 "falling back to server\n"))
347 "falling back to server\n"))
348 break
348 break
349 if missingid == "0":
349 if missingid == "0":
350 break
350 break
351 if missingid.startswith("_hits_"):
351 if missingid.startswith("_hits_"):
352 # receive progress reports
352 # receive progress reports
353 parts = missingid.split("_")
353 parts = missingid.split("_")
354 count += int(parts[2])
354 count += int(parts[2])
355 self.ui.progress(_downloading, count, total=total)
355 progress.update(count)
356 continue
356 continue
357
357
358 missed.append(missingid)
358 missed.append(missingid)
359
359
360 global fetchmisses
360 global fetchmisses
361 fetchmisses += len(missed)
361 fetchmisses += len(missed)
362
362
363 count = [total - len(missed)]
363 count = [total - len(missed)]
364 fromcache = count[0]
364 fromcache = count[0]
365 self.ui.progress(_downloading, count[0], total=total)
365 progress.update(count[0], total=total)
366 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
366 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
367 count[0], total, hit=count[0], total=total)
367 count[0], total, hit=count[0], total=total)
368
368
369 oldumask = os.umask(0o002)
369 oldumask = os.umask(0o002)
370 try:
370 try:
371 # receive cache misses from master
371 # receive cache misses from master
372 if missed:
372 if missed:
373 def progresstick():
373 def progresstick():
374 count[0] += 1
374 count[0] += 1
375 self.ui.progress(_downloading, count[0], total=total)
375 progress.update(count[0])
376 # When verbose is true, sshpeer prints 'running ssh...'
376 # When verbose is true, sshpeer prints 'running ssh...'
377 # to stdout, which can interfere with some command
377 # to stdout, which can interfere with some command
378 # outputs
378 # outputs
379 verbose = self.ui.verbose
379 verbose = self.ui.verbose
380 self.ui.verbose = False
380 self.ui.verbose = False
381 try:
381 try:
382 with self._connect() as conn:
382 with self._connect() as conn:
383 remote = conn.peer
383 remote = conn.peer
384 if remote.capable(
384 if remote.capable(
385 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
385 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
386 if not isinstance(remote, _sshv1peer):
386 if not isinstance(remote, _sshv1peer):
387 raise error.Abort('remotefilelog requires ssh '
387 raise error.Abort('remotefilelog requires ssh '
388 'servers')
388 'servers')
389 step = self.ui.configint('remotefilelog',
389 step = self.ui.configint('remotefilelog',
390 'getfilesstep')
390 'getfilesstep')
391 getfilestype = self.ui.config('remotefilelog',
391 getfilestype = self.ui.config('remotefilelog',
392 'getfilestype')
392 'getfilestype')
393 if getfilestype == 'threaded':
393 if getfilestype == 'threaded':
394 _getfiles = _getfiles_threaded
394 _getfiles = _getfiles_threaded
395 else:
395 else:
396 _getfiles = _getfiles_optimistic
396 _getfiles = _getfiles_optimistic
397 _getfiles(remote, self.receivemissing, progresstick,
397 _getfiles(remote, self.receivemissing, progresstick,
398 missed, idmap, step)
398 missed, idmap, step)
399 elif remote.capable("x_rfl_getfile"):
399 elif remote.capable("x_rfl_getfile"):
400 if remote.capable('batch'):
400 if remote.capable('batch'):
401 batchdefault = 100
401 batchdefault = 100
402 else:
402 else:
403 batchdefault = 10
403 batchdefault = 10
404 batchsize = self.ui.configint(
404 batchsize = self.ui.configint(
405 'remotefilelog', 'batchsize', batchdefault)
405 'remotefilelog', 'batchsize', batchdefault)
406 _getfilesbatch(
406 _getfilesbatch(
407 remote, self.receivemissing, progresstick,
407 remote, self.receivemissing, progresstick,
408 missed, idmap, batchsize)
408 missed, idmap, batchsize)
409 else:
409 else:
410 raise error.Abort("configured remotefilelog server"
410 raise error.Abort("configured remotefilelog server"
411 " does not support remotefilelog")
411 " does not support remotefilelog")
412
412
413 self.ui.log("remotefilefetchlog",
413 self.ui.log("remotefilefetchlog",
414 "Success\n",
414 "Success\n",
415 fetched_files = count[0] - fromcache,
415 fetched_files = count[0] - fromcache,
416 total_to_fetch = total - fromcache)
416 total_to_fetch = total - fromcache)
417 except Exception:
417 except Exception:
418 self.ui.log("remotefilefetchlog",
418 self.ui.log("remotefilefetchlog",
419 "Fail\n",
419 "Fail\n",
420 fetched_files = count[0] - fromcache,
420 fetched_files = count[0] - fromcache,
421 total_to_fetch = total - fromcache)
421 total_to_fetch = total - fromcache)
422 raise
422 raise
423 finally:
423 finally:
424 self.ui.verbose = verbose
424 self.ui.verbose = verbose
425 # send to memcache
425 # send to memcache
426 count[0] = len(missed)
426 count[0] = len(missed)
427 request = "set\n%d\n%s\n" % (count[0], "\n".join(missed))
427 request = "set\n%d\n%s\n" % (count[0], "\n".join(missed))
428 cache.request(request)
428 cache.request(request)
429
429
430 self.ui.progress(_downloading, None)
430 progress.complete()
431
431
432 # mark ourselves as a user of this cache
432 # mark ourselves as a user of this cache
433 writedata.markrepo(self.repo.path)
433 writedata.markrepo(self.repo.path)
434 finally:
434 finally:
435 os.umask(oldumask)
435 os.umask(oldumask)
436
436
437 def receivemissing(self, pipe, filename, node):
437 def receivemissing(self, pipe, filename, node):
438 line = pipe.readline()[:-1]
438 line = pipe.readline()[:-1]
439 if not line:
439 if not line:
440 raise error.ResponseError(_("error downloading file contents:"),
440 raise error.ResponseError(_("error downloading file contents:"),
441 _("connection closed early"))
441 _("connection closed early"))
442 size = int(line)
442 size = int(line)
443 data = pipe.read(size)
443 data = pipe.read(size)
444 if len(data) != size:
444 if len(data) != size:
445 raise error.ResponseError(_("error downloading file contents:"),
445 raise error.ResponseError(_("error downloading file contents:"),
446 _("only received %s of %s bytes")
446 _("only received %s of %s bytes")
447 % (len(data), size))
447 % (len(data), size))
448
448
449 self.writedata.addremotefilelognode(filename, bin(node),
449 self.writedata.addremotefilelognode(filename, bin(node),
450 zlib.decompress(data))
450 zlib.decompress(data))
451
451
452 def connect(self):
452 def connect(self):
453 if self.cacheprocess:
453 if self.cacheprocess:
454 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
454 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
455 self.remotecache.connect(cmd)
455 self.remotecache.connect(cmd)
456 else:
456 else:
457 # If no cache process is specified, we fake one that always
457 # If no cache process is specified, we fake one that always
458 # returns cache misses. This enables tests to run easily
458 # returns cache misses. This enables tests to run easily
459 # and may eventually allow us to be a drop in replacement
459 # and may eventually allow us to be a drop in replacement
460 # for the largefiles extension.
460 # for the largefiles extension.
461 class simplecache(object):
461 class simplecache(object):
462 def __init__(self):
462 def __init__(self):
463 self.missingids = []
463 self.missingids = []
464 self.connected = True
464 self.connected = True
465
465
466 def close(self):
466 def close(self):
467 pass
467 pass
468
468
469 def request(self, value, flush=True):
469 def request(self, value, flush=True):
470 lines = value.split("\n")
470 lines = value.split("\n")
471 if lines[0] != "get":
471 if lines[0] != "get":
472 return
472 return
473 self.missingids = lines[2:-1]
473 self.missingids = lines[2:-1]
474 self.missingids.append('0')
474 self.missingids.append('0')
475
475
476 def receiveline(self):
476 def receiveline(self):
477 if len(self.missingids) > 0:
477 if len(self.missingids) > 0:
478 return self.missingids.pop(0)
478 return self.missingids.pop(0)
479 return None
479 return None
480
480
481 self.remotecache = simplecache()
481 self.remotecache = simplecache()
482
482
483 def close(self):
483 def close(self):
484 if fetches:
484 if fetches:
485 msg = ("%d files fetched over %d fetches - " +
485 msg = ("%d files fetched over %d fetches - " +
486 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
486 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
487 fetched,
487 fetched,
488 fetches,
488 fetches,
489 fetchmisses,
489 fetchmisses,
490 float(fetched - fetchmisses) / float(fetched) * 100.0,
490 float(fetched - fetchmisses) / float(fetched) * 100.0,
491 fetchcost)
491 fetchcost)
492 if self.debugoutput:
492 if self.debugoutput:
493 self.ui.warn(msg)
493 self.ui.warn(msg)
494 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
494 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
495 remotefilelogfetched=fetched,
495 remotefilelogfetched=fetched,
496 remotefilelogfetches=fetches,
496 remotefilelogfetches=fetches,
497 remotefilelogfetchmisses=fetchmisses,
497 remotefilelogfetchmisses=fetchmisses,
498 remotefilelogfetchtime=fetchcost * 1000)
498 remotefilelogfetchtime=fetchcost * 1000)
499
499
500 if self.remotecache.connected:
500 if self.remotecache.connected:
501 self.remotecache.close()
501 self.remotecache.close()
502
502
503 def prefetch(self, fileids, force=False, fetchdata=True,
503 def prefetch(self, fileids, force=False, fetchdata=True,
504 fetchhistory=False):
504 fetchhistory=False):
505 """downloads the given file versions to the cache
505 """downloads the given file versions to the cache
506 """
506 """
507 repo = self.repo
507 repo = self.repo
508 idstocheck = []
508 idstocheck = []
509 for file, id in fileids:
509 for file, id in fileids:
510 # hack
510 # hack
511 # - we don't use .hgtags
511 # - we don't use .hgtags
512 # - workingctx produces ids with length 42,
512 # - workingctx produces ids with length 42,
513 # which we skip since they aren't in any cache
513 # which we skip since they aren't in any cache
514 if (file == '.hgtags' or len(id) == 42
514 if (file == '.hgtags' or len(id) == 42
515 or not repo.shallowmatch(file)):
515 or not repo.shallowmatch(file)):
516 continue
516 continue
517
517
518 idstocheck.append((file, bin(id)))
518 idstocheck.append((file, bin(id)))
519
519
520 datastore = self.datastore
520 datastore = self.datastore
521 historystore = self.historystore
521 historystore = self.historystore
522 if force:
522 if force:
523 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
523 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
524 historystore = metadatastore.unionmetadatastore(
524 historystore = metadatastore.unionmetadatastore(
525 *repo.sharedhistorystores)
525 *repo.sharedhistorystores)
526
526
527 missingids = set()
527 missingids = set()
528 if fetchdata:
528 if fetchdata:
529 missingids.update(datastore.getmissing(idstocheck))
529 missingids.update(datastore.getmissing(idstocheck))
530 if fetchhistory:
530 if fetchhistory:
531 missingids.update(historystore.getmissing(idstocheck))
531 missingids.update(historystore.getmissing(idstocheck))
532
532
533 # partition missing nodes into nullid and not-nullid so we can
533 # partition missing nodes into nullid and not-nullid so we can
534 # warn about this filtering potentially shadowing bugs.
534 # warn about this filtering potentially shadowing bugs.
535 nullids = len([None for unused, id in missingids if id == nullid])
535 nullids = len([None for unused, id in missingids if id == nullid])
536 if nullids:
536 if nullids:
537 missingids = [(f, id) for f, id in missingids if id != nullid]
537 missingids = [(f, id) for f, id in missingids if id != nullid]
538 repo.ui.develwarn(
538 repo.ui.develwarn(
539 ('remotefilelog not fetching %d null revs'
539 ('remotefilelog not fetching %d null revs'
540 ' - this is likely hiding bugs' % nullids),
540 ' - this is likely hiding bugs' % nullids),
541 config='remotefilelog-ext')
541 config='remotefilelog-ext')
542 if missingids:
542 if missingids:
543 global fetches, fetched, fetchcost
543 global fetches, fetched, fetchcost
544 fetches += 1
544 fetches += 1
545
545
546 # We want to be able to detect excess individual file downloads, so
546 # We want to be able to detect excess individual file downloads, so
547 # let's log that information for debugging.
547 # let's log that information for debugging.
548 if fetches >= 15 and fetches < 18:
548 if fetches >= 15 and fetches < 18:
549 if fetches == 15:
549 if fetches == 15:
550 fetchwarning = self.ui.config('remotefilelog',
550 fetchwarning = self.ui.config('remotefilelog',
551 'fetchwarning')
551 'fetchwarning')
552 if fetchwarning:
552 if fetchwarning:
553 self.ui.warn(fetchwarning + '\n')
553 self.ui.warn(fetchwarning + '\n')
554 self.logstacktrace()
554 self.logstacktrace()
555 missingids = [(file, hex(id)) for file, id in missingids]
555 missingids = [(file, hex(id)) for file, id in missingids]
556 fetched += len(missingids)
556 fetched += len(missingids)
557 start = time.time()
557 start = time.time()
558 missingids = self.request(missingids)
558 missingids = self.request(missingids)
559 if missingids:
559 if missingids:
560 raise error.Abort(_("unable to download %d files") %
560 raise error.Abort(_("unable to download %d files") %
561 len(missingids))
561 len(missingids))
562 fetchcost += time.time() - start
562 fetchcost += time.time() - start
563 self._lfsprefetch(fileids)
563 self._lfsprefetch(fileids)
564
564
565 def _lfsprefetch(self, fileids):
565 def _lfsprefetch(self, fileids):
566 if not _lfsmod or not util.safehasattr(
566 if not _lfsmod or not util.safehasattr(
567 self.repo.svfs, 'lfslocalblobstore'):
567 self.repo.svfs, 'lfslocalblobstore'):
568 return
568 return
569 if not _lfsmod.wrapper.candownload(self.repo):
569 if not _lfsmod.wrapper.candownload(self.repo):
570 return
570 return
571 pointers = []
571 pointers = []
572 store = self.repo.svfs.lfslocalblobstore
572 store = self.repo.svfs.lfslocalblobstore
573 for file, id in fileids:
573 for file, id in fileids:
574 node = bin(id)
574 node = bin(id)
575 rlog = self.repo.file(file)
575 rlog = self.repo.file(file)
576 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
576 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
577 text = rlog.revision(node, raw=True)
577 text = rlog.revision(node, raw=True)
578 p = _lfsmod.pointer.deserialize(text)
578 p = _lfsmod.pointer.deserialize(text)
579 oid = p.oid()
579 oid = p.oid()
580 if not store.has(oid):
580 if not store.has(oid):
581 pointers.append(p)
581 pointers.append(p)
582 if len(pointers) > 0:
582 if len(pointers) > 0:
583 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
583 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
584 assert all(store.has(p.oid()) for p in pointers)
584 assert all(store.has(p.oid()) for p in pointers)
585
585
586 def logstacktrace(self):
586 def logstacktrace(self):
587 import traceback
587 import traceback
588 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
588 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
589 ''.join(traceback.format_stack()))
589 ''.join(traceback.format_stack()))
General Comments 0
You need to be logged in to leave comments. Login now