##// END OF EJS Templates
py3: fix a bytes vs str issue in remotefilelog extension...
Kyle Lippincott -
r44287:94670e12 default
parent child Browse files
Show More
@@ -1,667 +1,667 b''
1 # fileserverclient.py - client for communicating with the cache process
1 # fileserverclient.py - client for communicating with the cache process
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import io
11 import io
12 import os
12 import os
13 import threading
13 import threading
14 import time
14 import time
15 import zlib
15 import zlib
16
16
17 from mercurial.i18n import _
17 from mercurial.i18n import _
18 from mercurial.node import bin, hex, nullid
18 from mercurial.node import bin, hex, nullid
19 from mercurial import (
19 from mercurial import (
20 error,
20 error,
21 node,
21 node,
22 pycompat,
22 pycompat,
23 revlog,
23 revlog,
24 sshpeer,
24 sshpeer,
25 util,
25 util,
26 wireprotov1peer,
26 wireprotov1peer,
27 )
27 )
28 from mercurial.utils import procutil
28 from mercurial.utils import procutil
29
29
30 from . import (
30 from . import (
31 constants,
31 constants,
32 contentstore,
32 contentstore,
33 metadatastore,
33 metadatastore,
34 )
34 )
35
35
36 _sshv1peer = sshpeer.sshv1peer
36 _sshv1peer = sshpeer.sshv1peer
37
37
38 # Statistics for debugging
38 # Statistics for debugging
39 fetchcost = 0
39 fetchcost = 0
40 fetches = 0
40 fetches = 0
41 fetched = 0
41 fetched = 0
42 fetchmisses = 0
42 fetchmisses = 0
43
43
44 _lfsmod = None
44 _lfsmod = None
45
45
46
46
47 def getcachekey(reponame, file, id):
47 def getcachekey(reponame, file, id):
48 pathhash = node.hex(hashlib.sha1(file).digest())
48 pathhash = node.hex(hashlib.sha1(file).digest())
49 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
49 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
50
50
51
51
52 def getlocalkey(file, id):
52 def getlocalkey(file, id):
53 pathhash = node.hex(hashlib.sha1(file).digest())
53 pathhash = node.hex(hashlib.sha1(file).digest())
54 return os.path.join(pathhash, id)
54 return os.path.join(pathhash, id)
55
55
56
56
57 def peersetup(ui, peer):
57 def peersetup(ui, peer):
58 class remotefilepeer(peer.__class__):
58 class remotefilepeer(peer.__class__):
59 @wireprotov1peer.batchable
59 @wireprotov1peer.batchable
60 def x_rfl_getfile(self, file, node):
60 def x_rfl_getfile(self, file, node):
61 if not self.capable(b'x_rfl_getfile'):
61 if not self.capable(b'x_rfl_getfile'):
62 raise error.Abort(
62 raise error.Abort(
63 b'configured remotefile server does not support getfile'
63 b'configured remotefile server does not support getfile'
64 )
64 )
65 f = wireprotov1peer.future()
65 f = wireprotov1peer.future()
66 yield {b'file': file, b'node': node}, f
66 yield {b'file': file, b'node': node}, f
67 code, data = f.value.split(b'\0', 1)
67 code, data = f.value.split(b'\0', 1)
68 if int(code):
68 if int(code):
69 raise error.LookupError(file, node, data)
69 raise error.LookupError(file, node, data)
70 yield data
70 yield data
71
71
72 @wireprotov1peer.batchable
72 @wireprotov1peer.batchable
73 def x_rfl_getflogheads(self, path):
73 def x_rfl_getflogheads(self, path):
74 if not self.capable(b'x_rfl_getflogheads'):
74 if not self.capable(b'x_rfl_getflogheads'):
75 raise error.Abort(
75 raise error.Abort(
76 b'configured remotefile server does not '
76 b'configured remotefile server does not '
77 b'support getflogheads'
77 b'support getflogheads'
78 )
78 )
79 f = wireprotov1peer.future()
79 f = wireprotov1peer.future()
80 yield {b'path': path}, f
80 yield {b'path': path}, f
81 heads = f.value.split(b'\n') if f.value else []
81 heads = f.value.split(b'\n') if f.value else []
82 yield heads
82 yield heads
83
83
84 def _updatecallstreamopts(self, command, opts):
84 def _updatecallstreamopts(self, command, opts):
85 if command != b'getbundle':
85 if command != b'getbundle':
86 return
86 return
87 if (
87 if (
88 constants.NETWORK_CAP_LEGACY_SSH_GETFILES
88 constants.NETWORK_CAP_LEGACY_SSH_GETFILES
89 not in self.capabilities()
89 not in self.capabilities()
90 ):
90 ):
91 return
91 return
92 if not util.safehasattr(self, '_localrepo'):
92 if not util.safehasattr(self, '_localrepo'):
93 return
93 return
94 if (
94 if (
95 constants.SHALLOWREPO_REQUIREMENT
95 constants.SHALLOWREPO_REQUIREMENT
96 not in self._localrepo.requirements
96 not in self._localrepo.requirements
97 ):
97 ):
98 return
98 return
99
99
100 bundlecaps = opts.get(b'bundlecaps')
100 bundlecaps = opts.get(b'bundlecaps')
101 if bundlecaps:
101 if bundlecaps:
102 bundlecaps = [bundlecaps]
102 bundlecaps = [bundlecaps]
103 else:
103 else:
104 bundlecaps = []
104 bundlecaps = []
105
105
106 # shallow, includepattern, and excludepattern are a hacky way of
106 # shallow, includepattern, and excludepattern are a hacky way of
107 # carrying over data from the local repo to this getbundle
107 # carrying over data from the local repo to this getbundle
108 # command. We need to do it this way because bundle1 getbundle
108 # command. We need to do it this way because bundle1 getbundle
109 # doesn't provide any other place we can hook in to manipulate
109 # doesn't provide any other place we can hook in to manipulate
110 # getbundle args before it goes across the wire. Once we get rid
110 # getbundle args before it goes across the wire. Once we get rid
111 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
111 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
112 # do this more cleanly.
112 # do this more cleanly.
113 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
113 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
114 if self._localrepo.includepattern:
114 if self._localrepo.includepattern:
115 patterns = b'\0'.join(self._localrepo.includepattern)
115 patterns = b'\0'.join(self._localrepo.includepattern)
116 includecap = b"includepattern=" + patterns
116 includecap = b"includepattern=" + patterns
117 bundlecaps.append(includecap)
117 bundlecaps.append(includecap)
118 if self._localrepo.excludepattern:
118 if self._localrepo.excludepattern:
119 patterns = b'\0'.join(self._localrepo.excludepattern)
119 patterns = b'\0'.join(self._localrepo.excludepattern)
120 excludecap = b"excludepattern=" + patterns
120 excludecap = b"excludepattern=" + patterns
121 bundlecaps.append(excludecap)
121 bundlecaps.append(excludecap)
122 opts[b'bundlecaps'] = b','.join(bundlecaps)
122 opts[b'bundlecaps'] = b','.join(bundlecaps)
123
123
124 def _sendrequest(self, command, args, **opts):
124 def _sendrequest(self, command, args, **opts):
125 self._updatecallstreamopts(command, args)
125 self._updatecallstreamopts(command, args)
126 return super(remotefilepeer, self)._sendrequest(
126 return super(remotefilepeer, self)._sendrequest(
127 command, args, **opts
127 command, args, **opts
128 )
128 )
129
129
130 def _callstream(self, command, **opts):
130 def _callstream(self, command, **opts):
131 supertype = super(remotefilepeer, self)
131 supertype = super(remotefilepeer, self)
132 if not util.safehasattr(supertype, '_sendrequest'):
132 if not util.safehasattr(supertype, '_sendrequest'):
133 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
133 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
134 return super(remotefilepeer, self)._callstream(command, **opts)
134 return super(remotefilepeer, self)._callstream(command, **opts)
135
135
136 peer.__class__ = remotefilepeer
136 peer.__class__ = remotefilepeer
137
137
138
138
139 class cacheconnection(object):
139 class cacheconnection(object):
140 """The connection for communicating with the remote cache. Performs
140 """The connection for communicating with the remote cache. Performs
141 gets and sets by communicating with an external process that has the
141 gets and sets by communicating with an external process that has the
142 cache-specific implementation.
142 cache-specific implementation.
143 """
143 """
144
144
145 def __init__(self):
145 def __init__(self):
146 self.pipeo = self.pipei = self.pipee = None
146 self.pipeo = self.pipei = self.pipee = None
147 self.subprocess = None
147 self.subprocess = None
148 self.connected = False
148 self.connected = False
149
149
150 def connect(self, cachecommand):
150 def connect(self, cachecommand):
151 if self.pipeo:
151 if self.pipeo:
152 raise error.Abort(_(b"cache connection already open"))
152 raise error.Abort(_(b"cache connection already open"))
153 self.pipei, self.pipeo, self.pipee, self.subprocess = procutil.popen4(
153 self.pipei, self.pipeo, self.pipee, self.subprocess = procutil.popen4(
154 cachecommand
154 cachecommand
155 )
155 )
156 self.connected = True
156 self.connected = True
157
157
158 def close(self):
158 def close(self):
159 def tryclose(pipe):
159 def tryclose(pipe):
160 try:
160 try:
161 pipe.close()
161 pipe.close()
162 except Exception:
162 except Exception:
163 pass
163 pass
164
164
165 if self.connected:
165 if self.connected:
166 try:
166 try:
167 self.pipei.write(b"exit\n")
167 self.pipei.write(b"exit\n")
168 except Exception:
168 except Exception:
169 pass
169 pass
170 tryclose(self.pipei)
170 tryclose(self.pipei)
171 self.pipei = None
171 self.pipei = None
172 tryclose(self.pipeo)
172 tryclose(self.pipeo)
173 self.pipeo = None
173 self.pipeo = None
174 tryclose(self.pipee)
174 tryclose(self.pipee)
175 self.pipee = None
175 self.pipee = None
176 try:
176 try:
177 # Wait for process to terminate, making sure to avoid deadlock.
177 # Wait for process to terminate, making sure to avoid deadlock.
178 # See https://docs.python.org/2/library/subprocess.html for
178 # See https://docs.python.org/2/library/subprocess.html for
179 # warnings about wait() and deadlocking.
179 # warnings about wait() and deadlocking.
180 self.subprocess.communicate()
180 self.subprocess.communicate()
181 except Exception:
181 except Exception:
182 pass
182 pass
183 self.subprocess = None
183 self.subprocess = None
184 self.connected = False
184 self.connected = False
185
185
186 def request(self, request, flush=True):
186 def request(self, request, flush=True):
187 if self.connected:
187 if self.connected:
188 try:
188 try:
189 self.pipei.write(request)
189 self.pipei.write(request)
190 if flush:
190 if flush:
191 self.pipei.flush()
191 self.pipei.flush()
192 except IOError:
192 except IOError:
193 self.close()
193 self.close()
194
194
195 def receiveline(self):
195 def receiveline(self):
196 if not self.connected:
196 if not self.connected:
197 return None
197 return None
198 try:
198 try:
199 result = self.pipeo.readline()[:-1]
199 result = self.pipeo.readline()[:-1]
200 if not result:
200 if not result:
201 self.close()
201 self.close()
202 except IOError:
202 except IOError:
203 self.close()
203 self.close()
204
204
205 return result
205 return result
206
206
207
207
208 def _getfilesbatch(
208 def _getfilesbatch(
209 remote, receivemissing, progresstick, missed, idmap, batchsize
209 remote, receivemissing, progresstick, missed, idmap, batchsize
210 ):
210 ):
211 # Over http(s), iterbatch is a streamy method and we can start
211 # Over http(s), iterbatch is a streamy method and we can start
212 # looking at results early. This means we send one (potentially
212 # looking at results early. This means we send one (potentially
213 # large) request, but then we show nice progress as we process
213 # large) request, but then we show nice progress as we process
214 # file results, rather than showing chunks of $batchsize in
214 # file results, rather than showing chunks of $batchsize in
215 # progress.
215 # progress.
216 #
216 #
217 # Over ssh, iterbatch isn't streamy because batch() wasn't
217 # Over ssh, iterbatch isn't streamy because batch() wasn't
218 # explicitly designed as a streaming method. In the future we
218 # explicitly designed as a streaming method. In the future we
219 # should probably introduce a streambatch() method upstream and
219 # should probably introduce a streambatch() method upstream and
220 # use that for this.
220 # use that for this.
221 with remote.commandexecutor() as e:
221 with remote.commandexecutor() as e:
222 futures = []
222 futures = []
223 for m in missed:
223 for m in missed:
224 futures.append(
224 futures.append(
225 e.callcommand(
225 e.callcommand(
226 b'x_rfl_getfile', {b'file': idmap[m], b'node': m[-40:]}
226 b'x_rfl_getfile', {b'file': idmap[m], b'node': m[-40:]}
227 )
227 )
228 )
228 )
229
229
230 for i, m in enumerate(missed):
230 for i, m in enumerate(missed):
231 r = futures[i].result()
231 r = futures[i].result()
232 futures[i] = None # release memory
232 futures[i] = None # release memory
233 file_ = idmap[m]
233 file_ = idmap[m]
234 node = m[-40:]
234 node = m[-40:]
235 receivemissing(io.BytesIO(b'%d\n%s' % (len(r), r)), file_, node)
235 receivemissing(io.BytesIO(b'%d\n%s' % (len(r), r)), file_, node)
236 progresstick()
236 progresstick()
237
237
238
238
239 def _getfiles_optimistic(
239 def _getfiles_optimistic(
240 remote, receivemissing, progresstick, missed, idmap, step
240 remote, receivemissing, progresstick, missed, idmap, step
241 ):
241 ):
242 remote._callstream(b"x_rfl_getfiles")
242 remote._callstream(b"x_rfl_getfiles")
243 i = 0
243 i = 0
244 pipeo = remote._pipeo
244 pipeo = remote._pipeo
245 pipei = remote._pipei
245 pipei = remote._pipei
246 while i < len(missed):
246 while i < len(missed):
247 # issue a batch of requests
247 # issue a batch of requests
248 start = i
248 start = i
249 end = min(len(missed), start + step)
249 end = min(len(missed), start + step)
250 i = end
250 i = end
251 for missingid in missed[start:end]:
251 for missingid in missed[start:end]:
252 # issue new request
252 # issue new request
253 versionid = missingid[-40:]
253 versionid = missingid[-40:]
254 file = idmap[missingid]
254 file = idmap[missingid]
255 sshrequest = b"%s%s\n" % (versionid, file)
255 sshrequest = b"%s%s\n" % (versionid, file)
256 pipeo.write(sshrequest)
256 pipeo.write(sshrequest)
257 pipeo.flush()
257 pipeo.flush()
258
258
259 # receive batch results
259 # receive batch results
260 for missingid in missed[start:end]:
260 for missingid in missed[start:end]:
261 versionid = missingid[-40:]
261 versionid = missingid[-40:]
262 file = idmap[missingid]
262 file = idmap[missingid]
263 receivemissing(pipei, file, versionid)
263 receivemissing(pipei, file, versionid)
264 progresstick()
264 progresstick()
265
265
266 # End the command
266 # End the command
267 pipeo.write(b'\n')
267 pipeo.write(b'\n')
268 pipeo.flush()
268 pipeo.flush()
269
269
270
270
271 def _getfiles_threaded(
271 def _getfiles_threaded(
272 remote, receivemissing, progresstick, missed, idmap, step
272 remote, receivemissing, progresstick, missed, idmap, step
273 ):
273 ):
274 remote._callstream(b"getfiles")
274 remote._callstream(b"getfiles")
275 pipeo = remote._pipeo
275 pipeo = remote._pipeo
276 pipei = remote._pipei
276 pipei = remote._pipei
277
277
278 def writer():
278 def writer():
279 for missingid in missed:
279 for missingid in missed:
280 versionid = missingid[-40:]
280 versionid = missingid[-40:]
281 file = idmap[missingid]
281 file = idmap[missingid]
282 sshrequest = b"%s%s\n" % (versionid, file)
282 sshrequest = b"%s%s\n" % (versionid, file)
283 pipeo.write(sshrequest)
283 pipeo.write(sshrequest)
284 pipeo.flush()
284 pipeo.flush()
285
285
286 writerthread = threading.Thread(target=writer)
286 writerthread = threading.Thread(target=writer)
287 writerthread.daemon = True
287 writerthread.daemon = True
288 writerthread.start()
288 writerthread.start()
289
289
290 for missingid in missed:
290 for missingid in missed:
291 versionid = missingid[-40:]
291 versionid = missingid[-40:]
292 file = idmap[missingid]
292 file = idmap[missingid]
293 receivemissing(pipei, file, versionid)
293 receivemissing(pipei, file, versionid)
294 progresstick()
294 progresstick()
295
295
296 writerthread.join()
296 writerthread.join()
297 # End the command
297 # End the command
298 pipeo.write(b'\n')
298 pipeo.write(b'\n')
299 pipeo.flush()
299 pipeo.flush()
300
300
301
301
302 class fileserverclient(object):
302 class fileserverclient(object):
303 """A client for requesting files from the remote file server.
303 """A client for requesting files from the remote file server.
304 """
304 """
305
305
306 def __init__(self, repo):
306 def __init__(self, repo):
307 ui = repo.ui
307 ui = repo.ui
308 self.repo = repo
308 self.repo = repo
309 self.ui = ui
309 self.ui = ui
310 self.cacheprocess = ui.config(b"remotefilelog", b"cacheprocess")
310 self.cacheprocess = ui.config(b"remotefilelog", b"cacheprocess")
311 if self.cacheprocess:
311 if self.cacheprocess:
312 self.cacheprocess = util.expandpath(self.cacheprocess)
312 self.cacheprocess = util.expandpath(self.cacheprocess)
313
313
314 # This option causes remotefilelog to pass the full file path to the
314 # This option causes remotefilelog to pass the full file path to the
315 # cacheprocess instead of a hashed key.
315 # cacheprocess instead of a hashed key.
316 self.cacheprocesspasspath = ui.configbool(
316 self.cacheprocesspasspath = ui.configbool(
317 b"remotefilelog", b"cacheprocess.includepath"
317 b"remotefilelog", b"cacheprocess.includepath"
318 )
318 )
319
319
320 self.debugoutput = ui.configbool(b"remotefilelog", b"debug")
320 self.debugoutput = ui.configbool(b"remotefilelog", b"debug")
321
321
322 self.remotecache = cacheconnection()
322 self.remotecache = cacheconnection()
323
323
324 def setstore(self, datastore, historystore, writedata, writehistory):
324 def setstore(self, datastore, historystore, writedata, writehistory):
325 self.datastore = datastore
325 self.datastore = datastore
326 self.historystore = historystore
326 self.historystore = historystore
327 self.writedata = writedata
327 self.writedata = writedata
328 self.writehistory = writehistory
328 self.writehistory = writehistory
329
329
330 def _connect(self):
330 def _connect(self):
331 return self.repo.connectionpool.get(self.repo.fallbackpath)
331 return self.repo.connectionpool.get(self.repo.fallbackpath)
332
332
333 def request(self, fileids):
333 def request(self, fileids):
334 """Takes a list of filename/node pairs and fetches them from the
334 """Takes a list of filename/node pairs and fetches them from the
335 server. Files are stored in the local cache.
335 server. Files are stored in the local cache.
336 A list of nodes that the server couldn't find is returned.
336 A list of nodes that the server couldn't find is returned.
337 If the connection fails, an exception is raised.
337 If the connection fails, an exception is raised.
338 """
338 """
339 if not self.remotecache.connected:
339 if not self.remotecache.connected:
340 self.connect()
340 self.connect()
341 cache = self.remotecache
341 cache = self.remotecache
342 writedata = self.writedata
342 writedata = self.writedata
343
343
344 repo = self.repo
344 repo = self.repo
345 total = len(fileids)
345 total = len(fileids)
346 request = b"get\n%d\n" % total
346 request = b"get\n%d\n" % total
347 idmap = {}
347 idmap = {}
348 reponame = repo.name
348 reponame = repo.name
349 for file, id in fileids:
349 for file, id in fileids:
350 fullid = getcachekey(reponame, file, id)
350 fullid = getcachekey(reponame, file, id)
351 if self.cacheprocesspasspath:
351 if self.cacheprocesspasspath:
352 request += file + b'\0'
352 request += file + b'\0'
353 request += fullid + b"\n"
353 request += fullid + b"\n"
354 idmap[fullid] = file
354 idmap[fullid] = file
355
355
356 cache.request(request)
356 cache.request(request)
357
357
358 progress = self.ui.makeprogress(_(b'downloading'), total=total)
358 progress = self.ui.makeprogress(_(b'downloading'), total=total)
359 progress.update(0)
359 progress.update(0)
360
360
361 missed = []
361 missed = []
362 while True:
362 while True:
363 missingid = cache.receiveline()
363 missingid = cache.receiveline()
364 if not missingid:
364 if not missingid:
365 missedset = set(missed)
365 missedset = set(missed)
366 for missingid in idmap:
366 for missingid in idmap:
367 if not missingid in missedset:
367 if not missingid in missedset:
368 missed.append(missingid)
368 missed.append(missingid)
369 self.ui.warn(
369 self.ui.warn(
370 _(
370 _(
371 b"warning: cache connection closed early - "
371 b"warning: cache connection closed early - "
372 + b"falling back to server\n"
372 + b"falling back to server\n"
373 )
373 )
374 )
374 )
375 break
375 break
376 if missingid == b"0":
376 if missingid == b"0":
377 break
377 break
378 if missingid.startswith(b"_hits_"):
378 if missingid.startswith(b"_hits_"):
379 # receive progress reports
379 # receive progress reports
380 parts = missingid.split(b"_")
380 parts = missingid.split(b"_")
381 progress.increment(int(parts[2]))
381 progress.increment(int(parts[2]))
382 continue
382 continue
383
383
384 missed.append(missingid)
384 missed.append(missingid)
385
385
386 global fetchmisses
386 global fetchmisses
387 fetchmisses += len(missed)
387 fetchmisses += len(missed)
388
388
389 fromcache = total - len(missed)
389 fromcache = total - len(missed)
390 progress.update(fromcache, total=total)
390 progress.update(fromcache, total=total)
391 self.ui.log(
391 self.ui.log(
392 b"remotefilelog",
392 b"remotefilelog",
393 b"remote cache hit rate is %r of %r\n",
393 b"remote cache hit rate is %r of %r\n",
394 fromcache,
394 fromcache,
395 total,
395 total,
396 hit=fromcache,
396 hit=fromcache,
397 total=total,
397 total=total,
398 )
398 )
399
399
400 oldumask = os.umask(0o002)
400 oldumask = os.umask(0o002)
401 try:
401 try:
402 # receive cache misses from master
402 # receive cache misses from master
403 if missed:
403 if missed:
404 # When verbose is true, sshpeer prints 'running ssh...'
404 # When verbose is true, sshpeer prints 'running ssh...'
405 # to stdout, which can interfere with some command
405 # to stdout, which can interfere with some command
406 # outputs
406 # outputs
407 verbose = self.ui.verbose
407 verbose = self.ui.verbose
408 self.ui.verbose = False
408 self.ui.verbose = False
409 try:
409 try:
410 with self._connect() as conn:
410 with self._connect() as conn:
411 remote = conn.peer
411 remote = conn.peer
412 if remote.capable(
412 if remote.capable(
413 constants.NETWORK_CAP_LEGACY_SSH_GETFILES
413 constants.NETWORK_CAP_LEGACY_SSH_GETFILES
414 ):
414 ):
415 if not isinstance(remote, _sshv1peer):
415 if not isinstance(remote, _sshv1peer):
416 raise error.Abort(
416 raise error.Abort(
417 b'remotefilelog requires ssh servers'
417 b'remotefilelog requires ssh servers'
418 )
418 )
419 step = self.ui.configint(
419 step = self.ui.configint(
420 b'remotefilelog', b'getfilesstep'
420 b'remotefilelog', b'getfilesstep'
421 )
421 )
422 getfilestype = self.ui.config(
422 getfilestype = self.ui.config(
423 b'remotefilelog', b'getfilestype'
423 b'remotefilelog', b'getfilestype'
424 )
424 )
425 if getfilestype == b'threaded':
425 if getfilestype == b'threaded':
426 _getfiles = _getfiles_threaded
426 _getfiles = _getfiles_threaded
427 else:
427 else:
428 _getfiles = _getfiles_optimistic
428 _getfiles = _getfiles_optimistic
429 _getfiles(
429 _getfiles(
430 remote,
430 remote,
431 self.receivemissing,
431 self.receivemissing,
432 progress.increment,
432 progress.increment,
433 missed,
433 missed,
434 idmap,
434 idmap,
435 step,
435 step,
436 )
436 )
437 elif remote.capable(b"x_rfl_getfile"):
437 elif remote.capable(b"x_rfl_getfile"):
438 if remote.capable(b'batch'):
438 if remote.capable(b'batch'):
439 batchdefault = 100
439 batchdefault = 100
440 else:
440 else:
441 batchdefault = 10
441 batchdefault = 10
442 batchsize = self.ui.configint(
442 batchsize = self.ui.configint(
443 b'remotefilelog', b'batchsize', batchdefault
443 b'remotefilelog', b'batchsize', batchdefault
444 )
444 )
445 self.ui.debug(
445 self.ui.debug(
446 b'requesting %d files from '
446 b'requesting %d files from '
447 b'remotefilelog server...\n' % len(missed)
447 b'remotefilelog server...\n' % len(missed)
448 )
448 )
449 _getfilesbatch(
449 _getfilesbatch(
450 remote,
450 remote,
451 self.receivemissing,
451 self.receivemissing,
452 progress.increment,
452 progress.increment,
453 missed,
453 missed,
454 idmap,
454 idmap,
455 batchsize,
455 batchsize,
456 )
456 )
457 else:
457 else:
458 raise error.Abort(
458 raise error.Abort(
459 b"configured remotefilelog server"
459 b"configured remotefilelog server"
460 b" does not support remotefilelog"
460 b" does not support remotefilelog"
461 )
461 )
462
462
463 self.ui.log(
463 self.ui.log(
464 b"remotefilefetchlog",
464 b"remotefilefetchlog",
465 b"Success\n",
465 b"Success\n",
466 fetched_files=progress.pos - fromcache,
466 fetched_files=progress.pos - fromcache,
467 total_to_fetch=total - fromcache,
467 total_to_fetch=total - fromcache,
468 )
468 )
469 except Exception:
469 except Exception:
470 self.ui.log(
470 self.ui.log(
471 b"remotefilefetchlog",
471 b"remotefilefetchlog",
472 b"Fail\n",
472 b"Fail\n",
473 fetched_files=progress.pos - fromcache,
473 fetched_files=progress.pos - fromcache,
474 total_to_fetch=total - fromcache,
474 total_to_fetch=total - fromcache,
475 )
475 )
476 raise
476 raise
477 finally:
477 finally:
478 self.ui.verbose = verbose
478 self.ui.verbose = verbose
479 # send to memcache
479 # send to memcache
480 request = b"set\n%d\n%s\n" % (len(missed), b"\n".join(missed))
480 request = b"set\n%d\n%s\n" % (len(missed), b"\n".join(missed))
481 cache.request(request)
481 cache.request(request)
482
482
483 progress.complete()
483 progress.complete()
484
484
485 # mark ourselves as a user of this cache
485 # mark ourselves as a user of this cache
486 writedata.markrepo(self.repo.path)
486 writedata.markrepo(self.repo.path)
487 finally:
487 finally:
488 os.umask(oldumask)
488 os.umask(oldumask)
489
489
490 def receivemissing(self, pipe, filename, node):
490 def receivemissing(self, pipe, filename, node):
491 line = pipe.readline()[:-1]
491 line = pipe.readline()[:-1]
492 if not line:
492 if not line:
493 raise error.ResponseError(
493 raise error.ResponseError(
494 _(b"error downloading file contents:"),
494 _(b"error downloading file contents:"),
495 _(b"connection closed early"),
495 _(b"connection closed early"),
496 )
496 )
497 size = int(line)
497 size = int(line)
498 data = pipe.read(size)
498 data = pipe.read(size)
499 if len(data) != size:
499 if len(data) != size:
500 raise error.ResponseError(
500 raise error.ResponseError(
501 _(b"error downloading file contents:"),
501 _(b"error downloading file contents:"),
502 _(b"only received %s of %s bytes") % (len(data), size),
502 _(b"only received %s of %s bytes") % (len(data), size),
503 )
503 )
504
504
505 self.writedata.addremotefilelognode(
505 self.writedata.addremotefilelognode(
506 filename, bin(node), zlib.decompress(data)
506 filename, bin(node), zlib.decompress(data)
507 )
507 )
508
508
509 def connect(self):
509 def connect(self):
510 if self.cacheprocess:
510 if self.cacheprocess:
511 cmd = b"%s %s" % (self.cacheprocess, self.writedata._path)
511 cmd = b"%s %s" % (self.cacheprocess, self.writedata._path)
512 self.remotecache.connect(cmd)
512 self.remotecache.connect(cmd)
513 else:
513 else:
514 # If no cache process is specified, we fake one that always
514 # If no cache process is specified, we fake one that always
515 # returns cache misses. This enables tests to run easily
515 # returns cache misses. This enables tests to run easily
516 # and may eventually allow us to be a drop in replacement
516 # and may eventually allow us to be a drop in replacement
517 # for the largefiles extension.
517 # for the largefiles extension.
518 class simplecache(object):
518 class simplecache(object):
519 def __init__(self):
519 def __init__(self):
520 self.missingids = []
520 self.missingids = []
521 self.connected = True
521 self.connected = True
522
522
523 def close(self):
523 def close(self):
524 pass
524 pass
525
525
526 def request(self, value, flush=True):
526 def request(self, value, flush=True):
527 lines = value.split(b"\n")
527 lines = value.split(b"\n")
528 if lines[0] != b"get":
528 if lines[0] != b"get":
529 return
529 return
530 self.missingids = lines[2:-1]
530 self.missingids = lines[2:-1]
531 self.missingids.append(b'0')
531 self.missingids.append(b'0')
532
532
533 def receiveline(self):
533 def receiveline(self):
534 if len(self.missingids) > 0:
534 if len(self.missingids) > 0:
535 return self.missingids.pop(0)
535 return self.missingids.pop(0)
536 return None
536 return None
537
537
538 self.remotecache = simplecache()
538 self.remotecache = simplecache()
539
539
540 def close(self):
540 def close(self):
541 if fetches:
541 if fetches:
542 msg = (
542 msg = (
543 b"%d files fetched over %d fetches - "
543 b"%d files fetched over %d fetches - "
544 + b"(%d misses, %0.2f%% hit ratio) over %0.2fs\n"
544 + b"(%d misses, %0.2f%% hit ratio) over %0.2fs\n"
545 ) % (
545 ) % (
546 fetched,
546 fetched,
547 fetches,
547 fetches,
548 fetchmisses,
548 fetchmisses,
549 float(fetched - fetchmisses) / float(fetched) * 100.0,
549 float(fetched - fetchmisses) / float(fetched) * 100.0,
550 fetchcost,
550 fetchcost,
551 )
551 )
552 if self.debugoutput:
552 if self.debugoutput:
553 self.ui.warn(msg)
553 self.ui.warn(msg)
554 self.ui.log(
554 self.ui.log(
555 b"remotefilelog.prefetch",
555 b"remotefilelog.prefetch",
556 msg.replace(b"%", b"%%"),
556 msg.replace(b"%", b"%%"),
557 remotefilelogfetched=fetched,
557 remotefilelogfetched=fetched,
558 remotefilelogfetches=fetches,
558 remotefilelogfetches=fetches,
559 remotefilelogfetchmisses=fetchmisses,
559 remotefilelogfetchmisses=fetchmisses,
560 remotefilelogfetchtime=fetchcost * 1000,
560 remotefilelogfetchtime=fetchcost * 1000,
561 )
561 )
562
562
563 if self.remotecache.connected:
563 if self.remotecache.connected:
564 self.remotecache.close()
564 self.remotecache.close()
565
565
566 def prefetch(
566 def prefetch(
567 self, fileids, force=False, fetchdata=True, fetchhistory=False
567 self, fileids, force=False, fetchdata=True, fetchhistory=False
568 ):
568 ):
569 """downloads the given file versions to the cache
569 """downloads the given file versions to the cache
570 """
570 """
571 repo = self.repo
571 repo = self.repo
572 idstocheck = []
572 idstocheck = []
573 for file, id in fileids:
573 for file, id in fileids:
574 # hack
574 # hack
575 # - we don't use .hgtags
575 # - we don't use .hgtags
576 # - workingctx produces ids with length 42,
576 # - workingctx produces ids with length 42,
577 # which we skip since they aren't in any cache
577 # which we skip since they aren't in any cache
578 if (
578 if (
579 file == b'.hgtags'
579 file == b'.hgtags'
580 or len(id) == 42
580 or len(id) == 42
581 or not repo.shallowmatch(file)
581 or not repo.shallowmatch(file)
582 ):
582 ):
583 continue
583 continue
584
584
585 idstocheck.append((file, bin(id)))
585 idstocheck.append((file, bin(id)))
586
586
587 datastore = self.datastore
587 datastore = self.datastore
588 historystore = self.historystore
588 historystore = self.historystore
589 if force:
589 if force:
590 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
590 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
591 historystore = metadatastore.unionmetadatastore(
591 historystore = metadatastore.unionmetadatastore(
592 *repo.sharedhistorystores
592 *repo.sharedhistorystores
593 )
593 )
594
594
595 missingids = set()
595 missingids = set()
596 if fetchdata:
596 if fetchdata:
597 missingids.update(datastore.getmissing(idstocheck))
597 missingids.update(datastore.getmissing(idstocheck))
598 if fetchhistory:
598 if fetchhistory:
599 missingids.update(historystore.getmissing(idstocheck))
599 missingids.update(historystore.getmissing(idstocheck))
600
600
601 # partition missing nodes into nullid and not-nullid so we can
601 # partition missing nodes into nullid and not-nullid so we can
602 # warn about this filtering potentially shadowing bugs.
602 # warn about this filtering potentially shadowing bugs.
603 nullids = len([None for unused, id in missingids if id == nullid])
603 nullids = len([None for unused, id in missingids if id == nullid])
604 if nullids:
604 if nullids:
605 missingids = [(f, id) for f, id in missingids if id != nullid]
605 missingids = [(f, id) for f, id in missingids if id != nullid]
606 repo.ui.develwarn(
606 repo.ui.develwarn(
607 (
607 (
608 b'remotefilelog not fetching %d null revs'
608 b'remotefilelog not fetching %d null revs'
609 b' - this is likely hiding bugs' % nullids
609 b' - this is likely hiding bugs' % nullids
610 ),
610 ),
611 config=b'remotefilelog-ext',
611 config=b'remotefilelog-ext',
612 )
612 )
613 if missingids:
613 if missingids:
614 global fetches, fetched, fetchcost
614 global fetches, fetched, fetchcost
615 fetches += 1
615 fetches += 1
616
616
617 # We want to be able to detect excess individual file downloads, so
617 # We want to be able to detect excess individual file downloads, so
618 # let's log that information for debugging.
618 # let's log that information for debugging.
619 if fetches >= 15 and fetches < 18:
619 if fetches >= 15 and fetches < 18:
620 if fetches == 15:
620 if fetches == 15:
621 fetchwarning = self.ui.config(
621 fetchwarning = self.ui.config(
622 b'remotefilelog', b'fetchwarning'
622 b'remotefilelog', b'fetchwarning'
623 )
623 )
624 if fetchwarning:
624 if fetchwarning:
625 self.ui.warn(fetchwarning + b'\n')
625 self.ui.warn(fetchwarning + b'\n')
626 self.logstacktrace()
626 self.logstacktrace()
627 missingids = [(file, hex(id)) for file, id in sorted(missingids)]
627 missingids = [(file, hex(id)) for file, id in sorted(missingids)]
628 fetched += len(missingids)
628 fetched += len(missingids)
629 start = time.time()
629 start = time.time()
630 missingids = self.request(missingids)
630 missingids = self.request(missingids)
631 if missingids:
631 if missingids:
632 raise error.Abort(
632 raise error.Abort(
633 _(b"unable to download %d files") % len(missingids)
633 _(b"unable to download %d files") % len(missingids)
634 )
634 )
635 fetchcost += time.time() - start
635 fetchcost += time.time() - start
636 self._lfsprefetch(fileids)
636 self._lfsprefetch(fileids)
637
637
638 def _lfsprefetch(self, fileids):
638 def _lfsprefetch(self, fileids):
639 if not _lfsmod or not util.safehasattr(
639 if not _lfsmod or not util.safehasattr(
640 self.repo.svfs, b'lfslocalblobstore'
640 self.repo.svfs, b'lfslocalblobstore'
641 ):
641 ):
642 return
642 return
643 if not _lfsmod.wrapper.candownload(self.repo):
643 if not _lfsmod.wrapper.candownload(self.repo):
644 return
644 return
645 pointers = []
645 pointers = []
646 store = self.repo.svfs.lfslocalblobstore
646 store = self.repo.svfs.lfslocalblobstore
647 for file, id in fileids:
647 for file, id in fileids:
648 node = bin(id)
648 node = bin(id)
649 rlog = self.repo.file(file)
649 rlog = self.repo.file(file)
650 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
650 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
651 text = rlog.rawdata(node)
651 text = rlog.rawdata(node)
652 p = _lfsmod.pointer.deserialize(text)
652 p = _lfsmod.pointer.deserialize(text)
653 oid = p.oid()
653 oid = p.oid()
654 if not store.has(oid):
654 if not store.has(oid):
655 pointers.append(p)
655 pointers.append(p)
656 if len(pointers) > 0:
656 if len(pointers) > 0:
657 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
657 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
658 assert all(store.has(p.oid()) for p in pointers)
658 assert all(store.has(p.oid()) for p in pointers)
659
659
660 def logstacktrace(self):
660 def logstacktrace(self):
661 import traceback
661 import traceback
662
662
663 self.ui.log(
663 self.ui.log(
664 b'remotefilelog',
664 b'remotefilelog',
665 b'excess remotefilelog fetching:\n%s\n',
665 b'excess remotefilelog fetching:\n%s\n',
666 b''.join(traceback.format_stack()),
666 b''.join(pycompat.sysbytes(traceback.format_stack())),
667 )
667 )
General Comments 0
You need to be logged in to leave comments. Login now