##// END OF EJS Templates
rawdata: update caller in remotefilelog...
marmoute -
r43016:d53975bb default draft
parent child Browse files
Show More
@@ -1,376 +1,376 b''
1 1 from __future__ import absolute_import
2 2
3 3 import threading
4 4
5 5 from mercurial.node import hex, nullid
6 6 from mercurial import (
7 7 mdiff,
8 8 pycompat,
9 9 revlog,
10 10 )
11 11 from . import (
12 12 basestore,
13 13 constants,
14 14 shallowutil,
15 15 )
16 16
17 17 class ChainIndicies(object):
18 18 """A static class for easy reference to the delta chain indicies.
19 19 """
20 20 # The filename of this revision delta
21 21 NAME = 0
22 22 # The mercurial file node for this revision delta
23 23 NODE = 1
24 24 # The filename of the delta base's revision. This is useful when delta
25 25 # between different files (like in the case of a move or copy, we can delta
26 26 # against the original file content).
27 27 BASENAME = 2
28 28 # The mercurial file node for the delta base revision. This is the nullid if
29 29 # this delta is a full text.
30 30 BASENODE = 3
31 31 # The actual delta or full text data.
32 32 DATA = 4
33 33
34 34 class unioncontentstore(basestore.baseunionstore):
35 35 def __init__(self, *args, **kwargs):
36 36 super(unioncontentstore, self).__init__(*args, **kwargs)
37 37
38 38 self.stores = args
39 39 self.writestore = kwargs.get(r'writestore')
40 40
41 41 # If allowincomplete==True then the union store can return partial
42 42 # delta chains, otherwise it will throw a KeyError if a full
43 43 # deltachain can't be found.
44 44 self.allowincomplete = kwargs.get(r'allowincomplete', False)
45 45
46 46 def get(self, name, node):
47 47 """Fetches the full text revision contents of the given name+node pair.
48 48 If the full text doesn't exist, throws a KeyError.
49 49
50 50 Under the hood, this uses getdeltachain() across all the stores to build
51 51 up a full chain to produce the full text.
52 52 """
53 53 chain = self.getdeltachain(name, node)
54 54
55 55 if chain[-1][ChainIndicies.BASENODE] != nullid:
56 56 # If we didn't receive a full chain, throw
57 57 raise KeyError((name, hex(node)))
58 58
59 59 # The last entry in the chain is a full text, so we start our delta
60 60 # applies with that.
61 61 fulltext = chain.pop()[ChainIndicies.DATA]
62 62
63 63 text = fulltext
64 64 while chain:
65 65 delta = chain.pop()[ChainIndicies.DATA]
66 66 text = mdiff.patches(text, [delta])
67 67
68 68 return text
69 69
70 70 @basestore.baseunionstore.retriable
71 71 def getdelta(self, name, node):
72 72 """Return the single delta entry for the given name/node pair.
73 73 """
74 74 for store in self.stores:
75 75 try:
76 76 return store.getdelta(name, node)
77 77 except KeyError:
78 78 pass
79 79
80 80 raise KeyError((name, hex(node)))
81 81
82 82 def getdeltachain(self, name, node):
83 83 """Returns the deltachain for the given name/node pair.
84 84
85 85 Returns an ordered list of:
86 86
87 87 [(name, node, deltabasename, deltabasenode, deltacontent),...]
88 88
89 89 where the chain is terminated by a full text entry with a nullid
90 90 deltabasenode.
91 91 """
92 92 chain = self._getpartialchain(name, node)
93 93 while chain[-1][ChainIndicies.BASENODE] != nullid:
94 94 x, x, deltabasename, deltabasenode, x = chain[-1]
95 95 try:
96 96 morechain = self._getpartialchain(deltabasename, deltabasenode)
97 97 chain.extend(morechain)
98 98 except KeyError:
99 99 # If we allow incomplete chains, don't throw.
100 100 if not self.allowincomplete:
101 101 raise
102 102 break
103 103
104 104 return chain
105 105
106 106 @basestore.baseunionstore.retriable
107 107 def getmeta(self, name, node):
108 108 """Returns the metadata dict for given node."""
109 109 for store in self.stores:
110 110 try:
111 111 return store.getmeta(name, node)
112 112 except KeyError:
113 113 pass
114 114 raise KeyError((name, hex(node)))
115 115
116 116 def getmetrics(self):
117 117 metrics = [s.getmetrics() for s in self.stores]
118 118 return shallowutil.sumdicts(*metrics)
119 119
120 120 @basestore.baseunionstore.retriable
121 121 def _getpartialchain(self, name, node):
122 122 """Returns a partial delta chain for the given name/node pair.
123 123
124 124 A partial chain is a chain that may not be terminated in a full-text.
125 125 """
126 126 for store in self.stores:
127 127 try:
128 128 return store.getdeltachain(name, node)
129 129 except KeyError:
130 130 pass
131 131
132 132 raise KeyError((name, hex(node)))
133 133
134 134 def add(self, name, node, data):
135 135 raise RuntimeError("cannot add content only to remotefilelog "
136 136 "contentstore")
137 137
138 138 def getmissing(self, keys):
139 139 missing = keys
140 140 for store in self.stores:
141 141 if missing:
142 142 missing = store.getmissing(missing)
143 143 return missing
144 144
145 145 def addremotefilelognode(self, name, node, data):
146 146 if self.writestore:
147 147 self.writestore.addremotefilelognode(name, node, data)
148 148 else:
149 149 raise RuntimeError("no writable store configured")
150 150
151 151 def markledger(self, ledger, options=None):
152 152 for store in self.stores:
153 153 store.markledger(ledger, options)
154 154
155 155 class remotefilelogcontentstore(basestore.basestore):
156 156 def __init__(self, *args, **kwargs):
157 157 super(remotefilelogcontentstore, self).__init__(*args, **kwargs)
158 158 self._threaddata = threading.local()
159 159
160 160 def get(self, name, node):
161 161 # return raw revision text
162 162 data = self._getdata(name, node)
163 163
164 164 offset, size, flags = shallowutil.parsesizeflags(data)
165 165 content = data[offset:offset + size]
166 166
167 167 ancestormap = shallowutil.ancestormap(data)
168 168 p1, p2, linknode, copyfrom = ancestormap[node]
169 169 copyrev = None
170 170 if copyfrom:
171 171 copyrev = hex(p1)
172 172
173 173 self._updatemetacache(node, size, flags)
174 174
175 175 # lfs tracks renames in its own metadata, remove hg copy metadata,
176 176 # because copy metadata will be re-added by lfs flag processor.
177 177 if flags & revlog.REVIDX_EXTSTORED:
178 178 copyrev = copyfrom = None
179 179 revision = shallowutil.createrevlogtext(content, copyfrom, copyrev)
180 180 return revision
181 181
182 182 def getdelta(self, name, node):
183 183 # Since remotefilelog content stores only contain full texts, just
184 184 # return that.
185 185 revision = self.get(name, node)
186 186 return revision, name, nullid, self.getmeta(name, node)
187 187
188 188 def getdeltachain(self, name, node):
189 189 # Since remotefilelog content stores just contain full texts, we return
190 190 # a fake delta chain that just consists of a single full text revision.
191 191 # The nullid in the deltabasenode slot indicates that the revision is a
192 192 # fulltext.
193 193 revision = self.get(name, node)
194 194 return [(name, node, None, nullid, revision)]
195 195
196 196 def getmeta(self, name, node):
197 197 self._sanitizemetacache()
198 198 if node != self._threaddata.metacache[0]:
199 199 data = self._getdata(name, node)
200 200 offset, size, flags = shallowutil.parsesizeflags(data)
201 201 self._updatemetacache(node, size, flags)
202 202 return self._threaddata.metacache[1]
203 203
204 204 def add(self, name, node, data):
205 205 raise RuntimeError("cannot add content only to remotefilelog "
206 206 "contentstore")
207 207
208 208 def _sanitizemetacache(self):
209 209 metacache = getattr(self._threaddata, 'metacache', None)
210 210 if metacache is None:
211 211 self._threaddata.metacache = (None, None) # (node, meta)
212 212
213 213 def _updatemetacache(self, node, size, flags):
214 214 self._sanitizemetacache()
215 215 if node == self._threaddata.metacache[0]:
216 216 return
217 217 meta = {constants.METAKEYFLAG: flags,
218 218 constants.METAKEYSIZE: size}
219 219 self._threaddata.metacache = (node, meta)
220 220
221 221 class remotecontentstore(object):
222 222 def __init__(self, ui, fileservice, shared):
223 223 self._fileservice = fileservice
224 224 # type(shared) is usually remotefilelogcontentstore
225 225 self._shared = shared
226 226
227 227 def get(self, name, node):
228 228 self._fileservice.prefetch([(name, hex(node))], force=True,
229 229 fetchdata=True)
230 230 return self._shared.get(name, node)
231 231
232 232 def getdelta(self, name, node):
233 233 revision = self.get(name, node)
234 234 return revision, name, nullid, self._shared.getmeta(name, node)
235 235
236 236 def getdeltachain(self, name, node):
237 237 # Since our remote content stores just contain full texts, we return a
238 238 # fake delta chain that just consists of a single full text revision.
239 239 # The nullid in the deltabasenode slot indicates that the revision is a
240 240 # fulltext.
241 241 revision = self.get(name, node)
242 242 return [(name, node, None, nullid, revision)]
243 243
244 244 def getmeta(self, name, node):
245 245 self._fileservice.prefetch([(name, hex(node))], force=True,
246 246 fetchdata=True)
247 247 return self._shared.getmeta(name, node)
248 248
249 249 def add(self, name, node, data):
250 250 raise RuntimeError("cannot add to a remote store")
251 251
252 252 def getmissing(self, keys):
253 253 return keys
254 254
255 255 def markledger(self, ledger, options=None):
256 256 pass
257 257
258 258 class manifestrevlogstore(object):
259 259 def __init__(self, repo):
260 260 self._store = repo.store
261 261 self._svfs = repo.svfs
262 262 self._revlogs = dict()
263 263 self._cl = revlog.revlog(self._svfs, '00changelog.i')
264 264 self._repackstartlinkrev = 0
265 265
266 266 def get(self, name, node):
267 return self._revlog(name).revision(node, raw=True)
267 return self._revlog(name).rawdata(node)
268 268
269 269 def getdelta(self, name, node):
270 270 revision = self.get(name, node)
271 271 return revision, name, nullid, self.getmeta(name, node)
272 272
273 273 def getdeltachain(self, name, node):
274 274 revision = self.get(name, node)
275 275 return [(name, node, None, nullid, revision)]
276 276
277 277 def getmeta(self, name, node):
278 278 rl = self._revlog(name)
279 279 rev = rl.rev(node)
280 280 return {constants.METAKEYFLAG: rl.flags(rev),
281 281 constants.METAKEYSIZE: rl.rawsize(rev)}
282 282
283 283 def getancestors(self, name, node, known=None):
284 284 if known is None:
285 285 known = set()
286 286 if node in known:
287 287 return []
288 288
289 289 rl = self._revlog(name)
290 290 ancestors = {}
291 291 missing = set((node,))
292 292 for ancrev in rl.ancestors([rl.rev(node)], inclusive=True):
293 293 ancnode = rl.node(ancrev)
294 294 missing.discard(ancnode)
295 295
296 296 p1, p2 = rl.parents(ancnode)
297 297 if p1 != nullid and p1 not in known:
298 298 missing.add(p1)
299 299 if p2 != nullid and p2 not in known:
300 300 missing.add(p2)
301 301
302 302 linknode = self._cl.node(rl.linkrev(ancrev))
303 303 ancestors[rl.node(ancrev)] = (p1, p2, linknode, '')
304 304 if not missing:
305 305 break
306 306 return ancestors
307 307
308 308 def getnodeinfo(self, name, node):
309 309 cl = self._cl
310 310 rl = self._revlog(name)
311 311 parents = rl.parents(node)
312 312 linkrev = rl.linkrev(rl.rev(node))
313 313 return (parents[0], parents[1], cl.node(linkrev), None)
314 314
315 315 def add(self, *args):
316 316 raise RuntimeError("cannot add to a revlog store")
317 317
318 318 def _revlog(self, name):
319 319 rl = self._revlogs.get(name)
320 320 if rl is None:
321 321 revlogname = '00manifesttree.i'
322 322 if name != '':
323 323 revlogname = 'meta/%s/00manifest.i' % name
324 324 rl = revlog.revlog(self._svfs, revlogname)
325 325 self._revlogs[name] = rl
326 326 return rl
327 327
328 328 def getmissing(self, keys):
329 329 missing = []
330 330 for name, node in keys:
331 331 mfrevlog = self._revlog(name)
332 332 if node not in mfrevlog.nodemap:
333 333 missing.append((name, node))
334 334
335 335 return missing
336 336
337 337 def setrepacklinkrevrange(self, startrev, endrev):
338 338 self._repackstartlinkrev = startrev
339 339 self._repackendlinkrev = endrev
340 340
341 341 def markledger(self, ledger, options=None):
342 342 if options and options.get(constants.OPTION_PACKSONLY):
343 343 return
344 344 treename = ''
345 345 rl = revlog.revlog(self._svfs, '00manifesttree.i')
346 346 startlinkrev = self._repackstartlinkrev
347 347 endlinkrev = self._repackendlinkrev
348 348 for rev in pycompat.xrange(len(rl) - 1, -1, -1):
349 349 linkrev = rl.linkrev(rev)
350 350 if linkrev < startlinkrev:
351 351 break
352 352 if linkrev > endlinkrev:
353 353 continue
354 354 node = rl.node(rev)
355 355 ledger.markdataentry(self, treename, node)
356 356 ledger.markhistoryentry(self, treename, node)
357 357
358 358 for path, encoded, size in self._store.datafiles():
359 359 if path[:5] != 'meta/' or path[-2:] != '.i':
360 360 continue
361 361
362 362 treename = path[5:-len('/00manifest.i')]
363 363
364 364 rl = revlog.revlog(self._svfs, path)
365 365 for rev in pycompat.xrange(len(rl) - 1, -1, -1):
366 366 linkrev = rl.linkrev(rev)
367 367 if linkrev < startlinkrev:
368 368 break
369 369 if linkrev > endlinkrev:
370 370 continue
371 371 node = rl.node(rev)
372 372 ledger.markdataentry(self, treename, node)
373 373 ledger.markhistoryentry(self, treename, node)
374 374
375 375 def cleanup(self, ledger):
376 376 pass
@@ -1,584 +1,584 b''
1 1 # fileserverclient.py - client for communicating with the cache process
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import io
12 12 import os
13 13 import threading
14 14 import time
15 15 import zlib
16 16
17 17 from mercurial.i18n import _
18 18 from mercurial.node import bin, hex, nullid
19 19 from mercurial import (
20 20 error,
21 21 node,
22 22 pycompat,
23 23 revlog,
24 24 sshpeer,
25 25 util,
26 26 wireprotov1peer,
27 27 )
28 28 from mercurial.utils import procutil
29 29
30 30 from . import (
31 31 constants,
32 32 contentstore,
33 33 metadatastore,
34 34 )
35 35
36 36 _sshv1peer = sshpeer.sshv1peer
37 37
38 38 # Statistics for debugging
39 39 fetchcost = 0
40 40 fetches = 0
41 41 fetched = 0
42 42 fetchmisses = 0
43 43
44 44 _lfsmod = None
45 45
46 46 def getcachekey(reponame, file, id):
47 47 pathhash = node.hex(hashlib.sha1(file).digest())
48 48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
49 49
50 50 def getlocalkey(file, id):
51 51 pathhash = node.hex(hashlib.sha1(file).digest())
52 52 return os.path.join(pathhash, id)
53 53
54 54 def peersetup(ui, peer):
55 55
56 56 class remotefilepeer(peer.__class__):
57 57 @wireprotov1peer.batchable
58 58 def x_rfl_getfile(self, file, node):
59 59 if not self.capable('x_rfl_getfile'):
60 60 raise error.Abort(
61 61 'configured remotefile server does not support getfile')
62 62 f = wireprotov1peer.future()
63 63 yield {'file': file, 'node': node}, f
64 64 code, data = f.value.split('\0', 1)
65 65 if int(code):
66 66 raise error.LookupError(file, node, data)
67 67 yield data
68 68
69 69 @wireprotov1peer.batchable
70 70 def x_rfl_getflogheads(self, path):
71 71 if not self.capable('x_rfl_getflogheads'):
72 72 raise error.Abort('configured remotefile server does not '
73 73 'support getflogheads')
74 74 f = wireprotov1peer.future()
75 75 yield {'path': path}, f
76 76 heads = f.value.split('\n') if f.value else []
77 77 yield heads
78 78
79 79 def _updatecallstreamopts(self, command, opts):
80 80 if command != 'getbundle':
81 81 return
82 82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
83 83 not in self.capabilities()):
84 84 return
85 85 if not util.safehasattr(self, '_localrepo'):
86 86 return
87 87 if (constants.SHALLOWREPO_REQUIREMENT
88 88 not in self._localrepo.requirements):
89 89 return
90 90
91 91 bundlecaps = opts.get('bundlecaps')
92 92 if bundlecaps:
93 93 bundlecaps = [bundlecaps]
94 94 else:
95 95 bundlecaps = []
96 96
97 97 # shallow, includepattern, and excludepattern are a hacky way of
98 98 # carrying over data from the local repo to this getbundle
99 99 # command. We need to do it this way because bundle1 getbundle
100 100 # doesn't provide any other place we can hook in to manipulate
101 101 # getbundle args before it goes across the wire. Once we get rid
102 102 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
103 103 # do this more cleanly.
104 104 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
105 105 if self._localrepo.includepattern:
106 106 patterns = '\0'.join(self._localrepo.includepattern)
107 107 includecap = "includepattern=" + patterns
108 108 bundlecaps.append(includecap)
109 109 if self._localrepo.excludepattern:
110 110 patterns = '\0'.join(self._localrepo.excludepattern)
111 111 excludecap = "excludepattern=" + patterns
112 112 bundlecaps.append(excludecap)
113 113 opts['bundlecaps'] = ','.join(bundlecaps)
114 114
115 115 def _sendrequest(self, command, args, **opts):
116 116 self._updatecallstreamopts(command, args)
117 117 return super(remotefilepeer, self)._sendrequest(command, args,
118 118 **opts)
119 119
120 120 def _callstream(self, command, **opts):
121 121 supertype = super(remotefilepeer, self)
122 122 if not util.safehasattr(supertype, '_sendrequest'):
123 123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
124 124 return super(remotefilepeer, self)._callstream(command, **opts)
125 125
126 126 peer.__class__ = remotefilepeer
127 127
128 128 class cacheconnection(object):
129 129 """The connection for communicating with the remote cache. Performs
130 130 gets and sets by communicating with an external process that has the
131 131 cache-specific implementation.
132 132 """
133 133 def __init__(self):
134 134 self.pipeo = self.pipei = self.pipee = None
135 135 self.subprocess = None
136 136 self.connected = False
137 137
138 138 def connect(self, cachecommand):
139 139 if self.pipeo:
140 140 raise error.Abort(_("cache connection already open"))
141 141 self.pipei, self.pipeo, self.pipee, self.subprocess = (
142 142 procutil.popen4(cachecommand))
143 143 self.connected = True
144 144
145 145 def close(self):
146 146 def tryclose(pipe):
147 147 try:
148 148 pipe.close()
149 149 except Exception:
150 150 pass
151 151 if self.connected:
152 152 try:
153 153 self.pipei.write("exit\n")
154 154 except Exception:
155 155 pass
156 156 tryclose(self.pipei)
157 157 self.pipei = None
158 158 tryclose(self.pipeo)
159 159 self.pipeo = None
160 160 tryclose(self.pipee)
161 161 self.pipee = None
162 162 try:
163 163 # Wait for process to terminate, making sure to avoid deadlock.
164 164 # See https://docs.python.org/2/library/subprocess.html for
165 165 # warnings about wait() and deadlocking.
166 166 self.subprocess.communicate()
167 167 except Exception:
168 168 pass
169 169 self.subprocess = None
170 170 self.connected = False
171 171
172 172 def request(self, request, flush=True):
173 173 if self.connected:
174 174 try:
175 175 self.pipei.write(request)
176 176 if flush:
177 177 self.pipei.flush()
178 178 except IOError:
179 179 self.close()
180 180
181 181 def receiveline(self):
182 182 if not self.connected:
183 183 return None
184 184 try:
185 185 result = self.pipeo.readline()[:-1]
186 186 if not result:
187 187 self.close()
188 188 except IOError:
189 189 self.close()
190 190
191 191 return result
192 192
193 193 def _getfilesbatch(
194 194 remote, receivemissing, progresstick, missed, idmap, batchsize):
195 195 # Over http(s), iterbatch is a streamy method and we can start
196 196 # looking at results early. This means we send one (potentially
197 197 # large) request, but then we show nice progress as we process
198 198 # file results, rather than showing chunks of $batchsize in
199 199 # progress.
200 200 #
201 201 # Over ssh, iterbatch isn't streamy because batch() wasn't
202 202 # explicitly designed as a streaming method. In the future we
203 203 # should probably introduce a streambatch() method upstream and
204 204 # use that for this.
205 205 with remote.commandexecutor() as e:
206 206 futures = []
207 207 for m in missed:
208 208 futures.append(e.callcommand('x_rfl_getfile', {
209 209 'file': idmap[m],
210 210 'node': m[-40:]
211 211 }))
212 212
213 213 for i, m in enumerate(missed):
214 214 r = futures[i].result()
215 215 futures[i] = None # release memory
216 216 file_ = idmap[m]
217 217 node = m[-40:]
218 218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
219 219 progresstick()
220 220
221 221 def _getfiles_optimistic(
222 222 remote, receivemissing, progresstick, missed, idmap, step):
223 223 remote._callstream("x_rfl_getfiles")
224 224 i = 0
225 225 pipeo = remote._pipeo
226 226 pipei = remote._pipei
227 227 while i < len(missed):
228 228 # issue a batch of requests
229 229 start = i
230 230 end = min(len(missed), start + step)
231 231 i = end
232 232 for missingid in missed[start:end]:
233 233 # issue new request
234 234 versionid = missingid[-40:]
235 235 file = idmap[missingid]
236 236 sshrequest = "%s%s\n" % (versionid, file)
237 237 pipeo.write(sshrequest)
238 238 pipeo.flush()
239 239
240 240 # receive batch results
241 241 for missingid in missed[start:end]:
242 242 versionid = missingid[-40:]
243 243 file = idmap[missingid]
244 244 receivemissing(pipei, file, versionid)
245 245 progresstick()
246 246
247 247 # End the command
248 248 pipeo.write('\n')
249 249 pipeo.flush()
250 250
251 251 def _getfiles_threaded(
252 252 remote, receivemissing, progresstick, missed, idmap, step):
253 253 remote._callstream("getfiles")
254 254 pipeo = remote._pipeo
255 255 pipei = remote._pipei
256 256
257 257 def writer():
258 258 for missingid in missed:
259 259 versionid = missingid[-40:]
260 260 file = idmap[missingid]
261 261 sshrequest = "%s%s\n" % (versionid, file)
262 262 pipeo.write(sshrequest)
263 263 pipeo.flush()
264 264 writerthread = threading.Thread(target=writer)
265 265 writerthread.daemon = True
266 266 writerthread.start()
267 267
268 268 for missingid in missed:
269 269 versionid = missingid[-40:]
270 270 file = idmap[missingid]
271 271 receivemissing(pipei, file, versionid)
272 272 progresstick()
273 273
274 274 writerthread.join()
275 275 # End the command
276 276 pipeo.write('\n')
277 277 pipeo.flush()
278 278
279 279 class fileserverclient(object):
280 280 """A client for requesting files from the remote file server.
281 281 """
282 282 def __init__(self, repo):
283 283 ui = repo.ui
284 284 self.repo = repo
285 285 self.ui = ui
286 286 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
287 287 if self.cacheprocess:
288 288 self.cacheprocess = util.expandpath(self.cacheprocess)
289 289
290 290 # This option causes remotefilelog to pass the full file path to the
291 291 # cacheprocess instead of a hashed key.
292 292 self.cacheprocesspasspath = ui.configbool(
293 293 "remotefilelog", "cacheprocess.includepath")
294 294
295 295 self.debugoutput = ui.configbool("remotefilelog", "debug")
296 296
297 297 self.remotecache = cacheconnection()
298 298
299 299 def setstore(self, datastore, historystore, writedata, writehistory):
300 300 self.datastore = datastore
301 301 self.historystore = historystore
302 302 self.writedata = writedata
303 303 self.writehistory = writehistory
304 304
305 305 def _connect(self):
306 306 return self.repo.connectionpool.get(self.repo.fallbackpath)
307 307
308 308 def request(self, fileids):
309 309 """Takes a list of filename/node pairs and fetches them from the
310 310 server. Files are stored in the local cache.
311 311 A list of nodes that the server couldn't find is returned.
312 312 If the connection fails, an exception is raised.
313 313 """
314 314 if not self.remotecache.connected:
315 315 self.connect()
316 316 cache = self.remotecache
317 317 writedata = self.writedata
318 318
319 319 repo = self.repo
320 320 total = len(fileids)
321 321 request = "get\n%d\n" % total
322 322 idmap = {}
323 323 reponame = repo.name
324 324 for file, id in fileids:
325 325 fullid = getcachekey(reponame, file, id)
326 326 if self.cacheprocesspasspath:
327 327 request += file + '\0'
328 328 request += fullid + "\n"
329 329 idmap[fullid] = file
330 330
331 331 cache.request(request)
332 332
333 333 progress = self.ui.makeprogress(_('downloading'), total=total)
334 334 progress.update(0)
335 335
336 336 missed = []
337 337 while True:
338 338 missingid = cache.receiveline()
339 339 if not missingid:
340 340 missedset = set(missed)
341 341 for missingid in idmap:
342 342 if not missingid in missedset:
343 343 missed.append(missingid)
344 344 self.ui.warn(_("warning: cache connection closed early - " +
345 345 "falling back to server\n"))
346 346 break
347 347 if missingid == "0":
348 348 break
349 349 if missingid.startswith("_hits_"):
350 350 # receive progress reports
351 351 parts = missingid.split("_")
352 352 progress.increment(int(parts[2]))
353 353 continue
354 354
355 355 missed.append(missingid)
356 356
357 357 global fetchmisses
358 358 fetchmisses += len(missed)
359 359
360 360 fromcache = total - len(missed)
361 361 progress.update(fromcache, total=total)
362 362 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
363 363 fromcache, total, hit=fromcache, total=total)
364 364
365 365 oldumask = os.umask(0o002)
366 366 try:
367 367 # receive cache misses from master
368 368 if missed:
369 369 # When verbose is true, sshpeer prints 'running ssh...'
370 370 # to stdout, which can interfere with some command
371 371 # outputs
372 372 verbose = self.ui.verbose
373 373 self.ui.verbose = False
374 374 try:
375 375 with self._connect() as conn:
376 376 remote = conn.peer
377 377 if remote.capable(
378 378 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
379 379 if not isinstance(remote, _sshv1peer):
380 380 raise error.Abort('remotefilelog requires ssh '
381 381 'servers')
382 382 step = self.ui.configint('remotefilelog',
383 383 'getfilesstep')
384 384 getfilestype = self.ui.config('remotefilelog',
385 385 'getfilestype')
386 386 if getfilestype == 'threaded':
387 387 _getfiles = _getfiles_threaded
388 388 else:
389 389 _getfiles = _getfiles_optimistic
390 390 _getfiles(remote, self.receivemissing,
391 391 progress.increment, missed, idmap, step)
392 392 elif remote.capable("x_rfl_getfile"):
393 393 if remote.capable('batch'):
394 394 batchdefault = 100
395 395 else:
396 396 batchdefault = 10
397 397 batchsize = self.ui.configint(
398 398 'remotefilelog', 'batchsize', batchdefault)
399 399 self.ui.debug(
400 400 b'requesting %d files from '
401 401 b'remotefilelog server...\n' % len(missed))
402 402 _getfilesbatch(
403 403 remote, self.receivemissing, progress.increment,
404 404 missed, idmap, batchsize)
405 405 else:
406 406 raise error.Abort("configured remotefilelog server"
407 407 " does not support remotefilelog")
408 408
409 409 self.ui.log("remotefilefetchlog",
410 410 "Success\n",
411 411 fetched_files = progress.pos - fromcache,
412 412 total_to_fetch = total - fromcache)
413 413 except Exception:
414 414 self.ui.log("remotefilefetchlog",
415 415 "Fail\n",
416 416 fetched_files = progress.pos - fromcache,
417 417 total_to_fetch = total - fromcache)
418 418 raise
419 419 finally:
420 420 self.ui.verbose = verbose
421 421 # send to memcache
422 422 request = "set\n%d\n%s\n" % (len(missed), "\n".join(missed))
423 423 cache.request(request)
424 424
425 425 progress.complete()
426 426
427 427 # mark ourselves as a user of this cache
428 428 writedata.markrepo(self.repo.path)
429 429 finally:
430 430 os.umask(oldumask)
431 431
432 432 def receivemissing(self, pipe, filename, node):
433 433 line = pipe.readline()[:-1]
434 434 if not line:
435 435 raise error.ResponseError(_("error downloading file contents:"),
436 436 _("connection closed early"))
437 437 size = int(line)
438 438 data = pipe.read(size)
439 439 if len(data) != size:
440 440 raise error.ResponseError(_("error downloading file contents:"),
441 441 _("only received %s of %s bytes")
442 442 % (len(data), size))
443 443
444 444 self.writedata.addremotefilelognode(filename, bin(node),
445 445 zlib.decompress(data))
446 446
447 447 def connect(self):
448 448 if self.cacheprocess:
449 449 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
450 450 self.remotecache.connect(cmd)
451 451 else:
452 452 # If no cache process is specified, we fake one that always
453 453 # returns cache misses. This enables tests to run easily
454 454 # and may eventually allow us to be a drop in replacement
455 455 # for the largefiles extension.
456 456 class simplecache(object):
457 457 def __init__(self):
458 458 self.missingids = []
459 459 self.connected = True
460 460
461 461 def close(self):
462 462 pass
463 463
464 464 def request(self, value, flush=True):
465 465 lines = value.split("\n")
466 466 if lines[0] != "get":
467 467 return
468 468 self.missingids = lines[2:-1]
469 469 self.missingids.append('0')
470 470
471 471 def receiveline(self):
472 472 if len(self.missingids) > 0:
473 473 return self.missingids.pop(0)
474 474 return None
475 475
476 476 self.remotecache = simplecache()
477 477
478 478 def close(self):
479 479 if fetches:
480 480 msg = ("%d files fetched over %d fetches - " +
481 481 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
482 482 fetched,
483 483 fetches,
484 484 fetchmisses,
485 485 float(fetched - fetchmisses) / float(fetched) * 100.0,
486 486 fetchcost)
487 487 if self.debugoutput:
488 488 self.ui.warn(msg)
489 489 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
490 490 remotefilelogfetched=fetched,
491 491 remotefilelogfetches=fetches,
492 492 remotefilelogfetchmisses=fetchmisses,
493 493 remotefilelogfetchtime=fetchcost * 1000)
494 494
495 495 if self.remotecache.connected:
496 496 self.remotecache.close()
497 497
498 498 def prefetch(self, fileids, force=False, fetchdata=True,
499 499 fetchhistory=False):
500 500 """downloads the given file versions to the cache
501 501 """
502 502 repo = self.repo
503 503 idstocheck = []
504 504 for file, id in fileids:
505 505 # hack
506 506 # - we don't use .hgtags
507 507 # - workingctx produces ids with length 42,
508 508 # which we skip since they aren't in any cache
509 509 if (file == '.hgtags' or len(id) == 42
510 510 or not repo.shallowmatch(file)):
511 511 continue
512 512
513 513 idstocheck.append((file, bin(id)))
514 514
515 515 datastore = self.datastore
516 516 historystore = self.historystore
517 517 if force:
518 518 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
519 519 historystore = metadatastore.unionmetadatastore(
520 520 *repo.sharedhistorystores)
521 521
522 522 missingids = set()
523 523 if fetchdata:
524 524 missingids.update(datastore.getmissing(idstocheck))
525 525 if fetchhistory:
526 526 missingids.update(historystore.getmissing(idstocheck))
527 527
528 528 # partition missing nodes into nullid and not-nullid so we can
529 529 # warn about this filtering potentially shadowing bugs.
530 530 nullids = len([None for unused, id in missingids if id == nullid])
531 531 if nullids:
532 532 missingids = [(f, id) for f, id in missingids if id != nullid]
533 533 repo.ui.develwarn(
534 534 ('remotefilelog not fetching %d null revs'
535 535 ' - this is likely hiding bugs' % nullids),
536 536 config='remotefilelog-ext')
537 537 if missingids:
538 538 global fetches, fetched, fetchcost
539 539 fetches += 1
540 540
541 541 # We want to be able to detect excess individual file downloads, so
542 542 # let's log that information for debugging.
543 543 if fetches >= 15 and fetches < 18:
544 544 if fetches == 15:
545 545 fetchwarning = self.ui.config('remotefilelog',
546 546 'fetchwarning')
547 547 if fetchwarning:
548 548 self.ui.warn(fetchwarning + '\n')
549 549 self.logstacktrace()
550 550 missingids = [(file, hex(id)) for file, id in sorted(missingids)]
551 551 fetched += len(missingids)
552 552 start = time.time()
553 553 missingids = self.request(missingids)
554 554 if missingids:
555 555 raise error.Abort(_("unable to download %d files") %
556 556 len(missingids))
557 557 fetchcost += time.time() - start
558 558 self._lfsprefetch(fileids)
559 559
560 560 def _lfsprefetch(self, fileids):
561 561 if not _lfsmod or not util.safehasattr(
562 562 self.repo.svfs, 'lfslocalblobstore'):
563 563 return
564 564 if not _lfsmod.wrapper.candownload(self.repo):
565 565 return
566 566 pointers = []
567 567 store = self.repo.svfs.lfslocalblobstore
568 568 for file, id in fileids:
569 569 node = bin(id)
570 570 rlog = self.repo.file(file)
571 571 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
572 text = rlog.revision(node, raw=True)
572 text = rlog.rawdata(node)
573 573 p = _lfsmod.pointer.deserialize(text)
574 574 oid = p.oid()
575 575 if not store.has(oid):
576 576 pointers.append(p)
577 577 if len(pointers) > 0:
578 578 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
579 579 assert all(store.has(p.oid()) for p in pointers)
580 580
581 581 def logstacktrace(self):
582 582 import traceback
583 583 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
584 584 ''.join(traceback.format_stack()))
@@ -1,462 +1,462 b''
1 1 # remotefilelog.py - filelog implementation where filelog history is stored
2 2 # remotely
3 3 #
4 4 # Copyright 2013 Facebook, Inc.
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8 from __future__ import absolute_import
9 9
10 10 import collections
11 11 import os
12 12
13 13 from mercurial.node import (
14 14 bin,
15 15 nullid,
16 16 wdirfilenodeids,
17 17 wdirid,
18 18 )
19 19 from mercurial.i18n import _
20 20 from mercurial import (
21 21 ancestor,
22 22 error,
23 23 mdiff,
24 24 revlog,
25 25 )
26 26 from mercurial.utils import storageutil
27 27
28 28 from . import (
29 29 constants,
30 30 fileserverclient,
31 31 shallowutil,
32 32 )
33 33
34 34 class remotefilelognodemap(object):
35 35 def __init__(self, filename, store):
36 36 self._filename = filename
37 37 self._store = store
38 38
39 39 def __contains__(self, node):
40 40 missing = self._store.getmissing([(self._filename, node)])
41 41 return not bool(missing)
42 42
43 43 def __get__(self, node):
44 44 if node not in self:
45 45 raise KeyError(node)
46 46 return node
47 47
48 48 class remotefilelog(object):
49 49
50 50 _generaldelta = True
51 51
52 52 def __init__(self, opener, path, repo):
53 53 self.opener = opener
54 54 self.filename = path
55 55 self.repo = repo
56 56 self.nodemap = remotefilelognodemap(self.filename, repo.contentstore)
57 57
58 58 self.version = 1
59 59
60 60 def read(self, node):
61 61 """returns the file contents at this node"""
62 62 t = self.revision(node)
63 63 if not t.startswith('\1\n'):
64 64 return t
65 65 s = t.index('\1\n', 2)
66 66 return t[s + 2:]
67 67
68 68 def add(self, text, meta, transaction, linknode, p1=None, p2=None):
69 69 # hash with the metadata, like in vanilla filelogs
70 70 hashtext = shallowutil.createrevlogtext(text, meta.get('copy'),
71 71 meta.get('copyrev'))
72 72 node = storageutil.hashrevisionsha1(hashtext, p1, p2)
73 73 return self.addrevision(hashtext, transaction, linknode, p1, p2,
74 74 node=node)
75 75
76 76 def _createfileblob(self, text, meta, flags, p1, p2, node, linknode):
77 77 # text passed to "_createfileblob" does not include filelog metadata
78 78 header = shallowutil.buildfileblobheader(len(text), flags)
79 79 data = "%s\0%s" % (header, text)
80 80
81 81 realp1 = p1
82 82 copyfrom = ""
83 83 if meta and 'copy' in meta:
84 84 copyfrom = meta['copy']
85 85 realp1 = bin(meta['copyrev'])
86 86
87 87 data += "%s%s%s%s%s\0" % (node, realp1, p2, linknode, copyfrom)
88 88
89 89 visited = set()
90 90
91 91 pancestors = {}
92 92 queue = []
93 93 if realp1 != nullid:
94 94 p1flog = self
95 95 if copyfrom:
96 96 p1flog = remotefilelog(self.opener, copyfrom, self.repo)
97 97
98 98 pancestors.update(p1flog.ancestormap(realp1))
99 99 queue.append(realp1)
100 100 visited.add(realp1)
101 101 if p2 != nullid:
102 102 pancestors.update(self.ancestormap(p2))
103 103 queue.append(p2)
104 104 visited.add(p2)
105 105
106 106 ancestortext = ""
107 107
108 108 # add the ancestors in topological order
109 109 while queue:
110 110 c = queue.pop(0)
111 111 pa1, pa2, ancestorlinknode, pacopyfrom = pancestors[c]
112 112
113 113 pacopyfrom = pacopyfrom or ''
114 114 ancestortext += "%s%s%s%s%s\0" % (
115 115 c, pa1, pa2, ancestorlinknode, pacopyfrom)
116 116
117 117 if pa1 != nullid and pa1 not in visited:
118 118 queue.append(pa1)
119 119 visited.add(pa1)
120 120 if pa2 != nullid and pa2 not in visited:
121 121 queue.append(pa2)
122 122 visited.add(pa2)
123 123
124 124 data += ancestortext
125 125
126 126 return data
127 127
128 128 def addrevision(self, text, transaction, linknode, p1, p2, cachedelta=None,
129 129 node=None, flags=revlog.REVIDX_DEFAULT_FLAGS):
130 130 # text passed to "addrevision" includes hg filelog metadata header
131 131 if node is None:
132 132 node = storageutil.hashrevisionsha1(text, p1, p2)
133 133
134 134 meta, metaoffset = storageutil.parsemeta(text)
135 135 rawtext, validatehash = self._processflags(text, flags, 'write')
136 136 return self.addrawrevision(rawtext, transaction, linknode, p1, p2,
137 137 node, flags, cachedelta,
138 138 _metatuple=(meta, metaoffset))
139 139
140 140 def addrawrevision(self, rawtext, transaction, linknode, p1, p2, node,
141 141 flags, cachedelta=None, _metatuple=None):
142 142 if _metatuple:
143 143 # _metatuple: used by "addrevision" internally by remotefilelog
144 144 # meta was parsed confidently
145 145 meta, metaoffset = _metatuple
146 146 else:
147 147 # not from self.addrevision, but something else (repo._filecommit)
148 148 # calls addrawrevision directly. remotefilelog needs to get and
149 149 # strip filelog metadata.
150 150 # we don't have confidence about whether rawtext contains filelog
151 151 # metadata or not (flag processor could replace it), so we just
152 152 # parse it as best-effort.
153 153 # in LFS (flags != 0)'s case, the best way is to call LFS code to
154 154 # get the meta information, instead of storageutil.parsemeta.
155 155 meta, metaoffset = storageutil.parsemeta(rawtext)
156 156 if flags != 0:
157 157 # when flags != 0, be conservative and do not mangle rawtext, since
158 158 # a read flag processor expects the text not being mangled at all.
159 159 metaoffset = 0
160 160 if metaoffset:
161 161 # remotefilelog fileblob stores copy metadata in its ancestortext,
162 162 # not its main blob. so we need to remove filelog metadata
163 163 # (containing copy information) from text.
164 164 blobtext = rawtext[metaoffset:]
165 165 else:
166 166 blobtext = rawtext
167 167 data = self._createfileblob(blobtext, meta, flags, p1, p2, node,
168 168 linknode)
169 169 self.repo.contentstore.addremotefilelognode(self.filename, node, data)
170 170
171 171 return node
172 172
173 173 def renamed(self, node):
174 174 ancestors = self.repo.metadatastore.getancestors(self.filename, node)
175 175 p1, p2, linknode, copyfrom = ancestors[node]
176 176 if copyfrom:
177 177 return (copyfrom, p1)
178 178
179 179 return False
180 180
181 181 def size(self, node):
182 182 """return the size of a given revision"""
183 183 return len(self.read(node))
184 184
185 185 rawsize = size
186 186
187 187 def cmp(self, node, text):
188 188 """compare text with a given file revision
189 189
190 190 returns True if text is different than what is stored.
191 191 """
192 192
193 193 if node == nullid:
194 194 return True
195 195
196 196 nodetext = self.read(node)
197 197 return nodetext != text
198 198
199 199 def __nonzero__(self):
200 200 return True
201 201
202 202 __bool__ = __nonzero__
203 203
204 204 def __len__(self):
205 205 if self.filename == '.hgtags':
206 206 # The length of .hgtags is used to fast path tag checking.
207 207 # remotefilelog doesn't support .hgtags since the entire .hgtags
208 208 # history is needed. Use the excludepattern setting to make
209 209 # .hgtags a normal filelog.
210 210 return 0
211 211
212 212 raise RuntimeError("len not supported")
213 213
214 214 def empty(self):
215 215 return False
216 216
217 217 def flags(self, node):
218 218 if isinstance(node, int):
219 219 raise error.ProgrammingError(
220 220 'remotefilelog does not accept integer rev for flags')
221 221 store = self.repo.contentstore
222 222 return store.getmeta(self.filename, node).get(constants.METAKEYFLAG, 0)
223 223
224 224 def parents(self, node):
225 225 if node == nullid:
226 226 return nullid, nullid
227 227
228 228 ancestormap = self.repo.metadatastore.getancestors(self.filename, node)
229 229 p1, p2, linknode, copyfrom = ancestormap[node]
230 230 if copyfrom:
231 231 p1 = nullid
232 232
233 233 return p1, p2
234 234
235 235 def parentrevs(self, rev):
236 236 # TODO(augie): this is a node and should be a rev, but for now
237 237 # nothing in core seems to actually break.
238 238 return self.parents(rev)
239 239
240 240 def linknode(self, node):
241 241 ancestormap = self.repo.metadatastore.getancestors(self.filename, node)
242 242 p1, p2, linknode, copyfrom = ancestormap[node]
243 243 return linknode
244 244
245 245 def linkrev(self, node):
246 246 return self.repo.unfiltered().changelog.rev(self.linknode(node))
247 247
248 248 def emitrevisions(self, nodes, nodesorder=None, revisiondata=False,
249 249 assumehaveparentrevisions=False, deltaprevious=False,
250 250 deltamode=None):
251 251 # we don't use any of these parameters here
252 252 del nodesorder, revisiondata, assumehaveparentrevisions, deltaprevious
253 253 del deltamode
254 254 prevnode = None
255 255 for node in nodes:
256 256 p1, p2 = self.parents(node)
257 257 if prevnode is None:
258 258 basenode = prevnode = p1
259 259 if basenode == node:
260 260 basenode = nullid
261 261 if basenode != nullid:
262 262 revision = None
263 263 delta = self.revdiff(basenode, node)
264 264 else:
265 revision = self.revision(node, raw=True)
265 revision = self.rawdata(node)
266 266 delta = None
267 267 yield revlog.revlogrevisiondelta(
268 268 node=node,
269 269 p1node=p1,
270 270 p2node=p2,
271 271 linknode=self.linknode(node),
272 272 basenode=basenode,
273 273 flags=self.flags(node),
274 274 baserevisionsize=None,
275 275 revision=revision,
276 276 delta=delta,
277 277 )
278 278
279 279 def revdiff(self, node1, node2):
280 return mdiff.textdiff(self.revision(node1, raw=True),
281 self.revision(node2, raw=True))
280 return mdiff.textdiff(self.rawdata(node1),
281 self.rawdata(node2))
282 282
283 283 def lookup(self, node):
284 284 if len(node) == 40:
285 285 node = bin(node)
286 286 if len(node) != 20:
287 287 raise error.LookupError(node, self.filename,
288 288 _('invalid lookup input'))
289 289
290 290 return node
291 291
292 292 def rev(self, node):
293 293 # This is a hack to make TortoiseHG work.
294 294 return node
295 295
296 296 def node(self, rev):
297 297 # This is a hack.
298 298 if isinstance(rev, int):
299 299 raise error.ProgrammingError(
300 300 'remotefilelog does not convert integer rev to node')
301 301 return rev
302 302
303 303 def revision(self, node, raw=False):
304 304 """returns the revlog contents at this node.
305 305 this includes the meta data traditionally included in file revlogs.
306 306 this is generally only used for bundling and communicating with vanilla
307 307 hg clients.
308 308 """
309 309 if node == nullid:
310 310 return ""
311 311 if len(node) != 20:
312 312 raise error.LookupError(node, self.filename,
313 313 _('invalid revision input'))
314 314 if node == wdirid or node in wdirfilenodeids:
315 315 raise error.WdirUnsupported
316 316
317 317 store = self.repo.contentstore
318 318 rawtext = store.get(self.filename, node)
319 319 if raw:
320 320 return rawtext
321 321 flags = store.getmeta(self.filename, node).get(constants.METAKEYFLAG, 0)
322 322 if flags == 0:
323 323 return rawtext
324 324 text, verifyhash = self._processflags(rawtext, flags, 'read')
325 325 return text
326 326
327 327 def rawdata(self, node):
328 328 return self.revision(node, raw=False)
329 329
330 330 def _processflags(self, text, flags, operation, raw=False):
331 331 # mostly copied from hg/mercurial/revlog.py
332 332 validatehash = True
333 333 orderedflags = revlog.REVIDX_FLAGS_ORDER
334 334 if operation == 'write':
335 335 orderedflags = reversed(orderedflags)
336 336 for flag in orderedflags:
337 337 if flag & flags:
338 338 vhash = True
339 339 if flag not in revlog._flagprocessors:
340 340 message = _("missing processor for flag '%#x'") % (flag)
341 341 raise revlog.RevlogError(message)
342 342 readfunc, writefunc, rawfunc = revlog._flagprocessors[flag]
343 343 if raw:
344 344 vhash = rawfunc(self, text)
345 345 elif operation == 'read':
346 346 text, vhash = readfunc(self, text)
347 347 elif operation == 'write':
348 348 text, vhash = writefunc(self, text)
349 349 validatehash = validatehash and vhash
350 350 return text, validatehash
351 351
352 352 def _read(self, id):
353 353 """reads the raw file blob from disk, cache, or server"""
354 354 fileservice = self.repo.fileservice
355 355 localcache = fileservice.localcache
356 356 cachekey = fileserverclient.getcachekey(self.repo.name, self.filename,
357 357 id)
358 358 try:
359 359 return localcache.read(cachekey)
360 360 except KeyError:
361 361 pass
362 362
363 363 localkey = fileserverclient.getlocalkey(self.filename, id)
364 364 localpath = os.path.join(self.localpath, localkey)
365 365 try:
366 366 return shallowutil.readfile(localpath)
367 367 except IOError:
368 368 pass
369 369
370 370 fileservice.prefetch([(self.filename, id)])
371 371 try:
372 372 return localcache.read(cachekey)
373 373 except KeyError:
374 374 pass
375 375
376 376 raise error.LookupError(id, self.filename, _('no node'))
377 377
378 378 def ancestormap(self, node):
379 379 return self.repo.metadatastore.getancestors(self.filename, node)
380 380
381 381 def ancestor(self, a, b):
382 382 if a == nullid or b == nullid:
383 383 return nullid
384 384
385 385 revmap, parentfunc = self._buildrevgraph(a, b)
386 386 nodemap = dict(((v, k) for (k, v) in revmap.iteritems()))
387 387
388 388 ancs = ancestor.ancestors(parentfunc, revmap[a], revmap[b])
389 389 if ancs:
390 390 # choose a consistent winner when there's a tie
391 391 return min(map(nodemap.__getitem__, ancs))
392 392 return nullid
393 393
394 394 def commonancestorsheads(self, a, b):
395 395 """calculate all the heads of the common ancestors of nodes a and b"""
396 396
397 397 if a == nullid or b == nullid:
398 398 return nullid
399 399
400 400 revmap, parentfunc = self._buildrevgraph(a, b)
401 401 nodemap = dict(((v, k) for (k, v) in revmap.iteritems()))
402 402
403 403 ancs = ancestor.commonancestorsheads(parentfunc, revmap[a], revmap[b])
404 404 return map(nodemap.__getitem__, ancs)
405 405
406 406 def _buildrevgraph(self, a, b):
407 407 """Builds a numeric revision graph for the given two nodes.
408 408 Returns a node->rev map and a rev->[revs] parent function.
409 409 """
410 410 amap = self.ancestormap(a)
411 411 bmap = self.ancestormap(b)
412 412
413 413 # Union the two maps
414 414 parentsmap = collections.defaultdict(list)
415 415 allparents = set()
416 416 for mapping in (amap, bmap):
417 417 for node, pdata in mapping.iteritems():
418 418 parents = parentsmap[node]
419 419 p1, p2, linknode, copyfrom = pdata
420 420 # Don't follow renames (copyfrom).
421 421 # remotefilectx.ancestor does that.
422 422 if p1 != nullid and not copyfrom:
423 423 parents.append(p1)
424 424 allparents.add(p1)
425 425 if p2 != nullid:
426 426 parents.append(p2)
427 427 allparents.add(p2)
428 428
429 429 # Breadth first traversal to build linkrev graph
430 430 parentrevs = collections.defaultdict(list)
431 431 revmap = {}
432 432 queue = collections.deque(((None, n) for n in parentsmap
433 433 if n not in allparents))
434 434 while queue:
435 435 prevrev, current = queue.pop()
436 436 if current in revmap:
437 437 if prevrev:
438 438 parentrevs[prevrev].append(revmap[current])
439 439 continue
440 440
441 441 # Assign linkrevs in reverse order, so start at
442 442 # len(parentsmap) and work backwards.
443 443 currentrev = len(parentsmap) - len(revmap) - 1
444 444 revmap[current] = currentrev
445 445
446 446 if prevrev:
447 447 parentrevs[prevrev].append(currentrev)
448 448
449 449 for parent in parentsmap.get(current):
450 450 queue.appendleft((currentrev, parent))
451 451
452 452 return revmap, parentrevs.__getitem__
453 453
454 454 def strip(self, minlink, transaction):
455 455 pass
456 456
457 457 # misc unused things
458 458 def files(self):
459 459 return []
460 460
461 461 def checksize(self):
462 462 return 0, 0
@@ -1,404 +1,404 b''
1 1 # remotefilelogserver.py - server logic for a remotefilelog server
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 from __future__ import absolute_import
8 8
9 9 import errno
10 10 import os
11 11 import stat
12 12 import time
13 13 import zlib
14 14
15 15 from mercurial.i18n import _
16 16 from mercurial.node import bin, hex, nullid
17 17 from mercurial import (
18 18 changegroup,
19 19 changelog,
20 20 context,
21 21 error,
22 22 extensions,
23 23 match,
24 24 store,
25 25 streamclone,
26 26 util,
27 27 wireprotoserver,
28 28 wireprototypes,
29 29 wireprotov1server,
30 30 )
31 31 from . import (
32 32 constants,
33 33 shallowutil,
34 34 )
35 35
36 36 _sshv1server = wireprotoserver.sshv1protocolhandler
37 37
38 38 def setupserver(ui, repo):
39 39 """Sets up a normal Mercurial repo so it can serve files to shallow repos.
40 40 """
41 41 onetimesetup(ui)
42 42
43 43 # don't send files to shallow clients during pulls
44 44 def generatefiles(orig, self, changedfiles, linknodes, commonrevs, source,
45 45 *args, **kwargs):
46 46 caps = self._bundlecaps or []
47 47 if constants.BUNDLE2_CAPABLITY in caps:
48 48 # only send files that don't match the specified patterns
49 49 includepattern = None
50 50 excludepattern = None
51 51 for cap in (self._bundlecaps or []):
52 52 if cap.startswith("includepattern="):
53 53 includepattern = cap[len("includepattern="):].split('\0')
54 54 elif cap.startswith("excludepattern="):
55 55 excludepattern = cap[len("excludepattern="):].split('\0')
56 56
57 57 m = match.always()
58 58 if includepattern or excludepattern:
59 59 m = match.match(repo.root, '', None,
60 60 includepattern, excludepattern)
61 61
62 62 changedfiles = list([f for f in changedfiles if not m(f)])
63 63 return orig(self, changedfiles, linknodes, commonrevs, source,
64 64 *args, **kwargs)
65 65
66 66 extensions.wrapfunction(
67 67 changegroup.cgpacker, 'generatefiles', generatefiles)
68 68
69 69 onetime = False
70 70 def onetimesetup(ui):
71 71 """Configures the wireprotocol for both clients and servers.
72 72 """
73 73 global onetime
74 74 if onetime:
75 75 return
76 76 onetime = True
77 77
78 78 # support file content requests
79 79 wireprotov1server.wireprotocommand(
80 80 'x_rfl_getflogheads', 'path', permission='pull')(getflogheads)
81 81 wireprotov1server.wireprotocommand(
82 82 'x_rfl_getfiles', '', permission='pull')(getfiles)
83 83 wireprotov1server.wireprotocommand(
84 84 'x_rfl_getfile', 'file node', permission='pull')(getfile)
85 85
86 86 class streamstate(object):
87 87 match = None
88 88 shallowremote = False
89 89 noflatmf = False
90 90 state = streamstate()
91 91
92 92 def stream_out_shallow(repo, proto, other):
93 93 includepattern = None
94 94 excludepattern = None
95 95 raw = other.get('includepattern')
96 96 if raw:
97 97 includepattern = raw.split('\0')
98 98 raw = other.get('excludepattern')
99 99 if raw:
100 100 excludepattern = raw.split('\0')
101 101
102 102 oldshallow = state.shallowremote
103 103 oldmatch = state.match
104 104 oldnoflatmf = state.noflatmf
105 105 try:
106 106 state.shallowremote = True
107 107 state.match = match.always()
108 108 state.noflatmf = other.get('noflatmanifest') == 'True'
109 109 if includepattern or excludepattern:
110 110 state.match = match.match(repo.root, '', None,
111 111 includepattern, excludepattern)
112 112 streamres = wireprotov1server.stream(repo, proto)
113 113
114 114 # Force the first value to execute, so the file list is computed
115 115 # within the try/finally scope
116 116 first = next(streamres.gen)
117 117 second = next(streamres.gen)
118 118 def gen():
119 119 yield first
120 120 yield second
121 121 for value in streamres.gen:
122 122 yield value
123 123 return wireprototypes.streamres(gen())
124 124 finally:
125 125 state.shallowremote = oldshallow
126 126 state.match = oldmatch
127 127 state.noflatmf = oldnoflatmf
128 128
129 129 wireprotov1server.commands['stream_out_shallow'] = (stream_out_shallow, '*')
130 130
131 131 # don't clone filelogs to shallow clients
132 132 def _walkstreamfiles(orig, repo, matcher=None):
133 133 if state.shallowremote:
134 134 # if we are shallow ourselves, stream our local commits
135 135 if shallowutil.isenabled(repo):
136 136 striplen = len(repo.store.path) + 1
137 137 readdir = repo.store.rawvfs.readdir
138 138 visit = [os.path.join(repo.store.path, 'data')]
139 139 while visit:
140 140 p = visit.pop()
141 141 for f, kind, st in readdir(p, stat=True):
142 142 fp = p + '/' + f
143 143 if kind == stat.S_IFREG:
144 144 if not fp.endswith('.i') and not fp.endswith('.d'):
145 145 n = util.pconvert(fp[striplen:])
146 146 yield (store.decodedir(n), n, st.st_size)
147 147 if kind == stat.S_IFDIR:
148 148 visit.append(fp)
149 149
150 150 if 'treemanifest' in repo.requirements:
151 151 for (u, e, s) in repo.store.datafiles():
152 152 if (u.startswith('meta/') and
153 153 (u.endswith('.i') or u.endswith('.d'))):
154 154 yield (u, e, s)
155 155
156 156 # Return .d and .i files that do not match the shallow pattern
157 157 match = state.match
158 158 if match and not match.always():
159 159 for (u, e, s) in repo.store.datafiles():
160 160 f = u[5:-2] # trim data/... and .i/.d
161 161 if not state.match(f):
162 162 yield (u, e, s)
163 163
164 164 for x in repo.store.topfiles():
165 165 if state.noflatmf and x[0][:11] == '00manifest.':
166 166 continue
167 167 yield x
168 168
169 169 elif shallowutil.isenabled(repo):
170 170 # don't allow cloning from a shallow repo to a full repo
171 171 # since it would require fetching every version of every
172 172 # file in order to create the revlogs.
173 173 raise error.Abort(_("Cannot clone from a shallow repo "
174 174 "to a full repo."))
175 175 else:
176 176 for x in orig(repo, matcher):
177 177 yield x
178 178
179 179 extensions.wrapfunction(streamclone, '_walkstreamfiles', _walkstreamfiles)
180 180
181 181 # expose remotefilelog capabilities
182 182 def _capabilities(orig, repo, proto):
183 183 caps = orig(repo, proto)
184 184 if (shallowutil.isenabled(repo) or ui.configbool('remotefilelog',
185 185 'server')):
186 186 if isinstance(proto, _sshv1server):
187 187 # legacy getfiles method which only works over ssh
188 188 caps.append(constants.NETWORK_CAP_LEGACY_SSH_GETFILES)
189 189 caps.append('x_rfl_getflogheads')
190 190 caps.append('x_rfl_getfile')
191 191 return caps
192 192 extensions.wrapfunction(wireprotov1server, '_capabilities', _capabilities)
193 193
194 194 def _adjustlinkrev(orig, self, *args, **kwargs):
195 195 # When generating file blobs, taking the real path is too slow on large
196 196 # repos, so force it to just return the linkrev directly.
197 197 repo = self._repo
198 198 if util.safehasattr(repo, 'forcelinkrev') and repo.forcelinkrev:
199 199 return self._filelog.linkrev(self._filelog.rev(self._filenode))
200 200 return orig(self, *args, **kwargs)
201 201
202 202 extensions.wrapfunction(
203 203 context.basefilectx, '_adjustlinkrev', _adjustlinkrev)
204 204
205 205 def _iscmd(orig, cmd):
206 206 if cmd == 'x_rfl_getfiles':
207 207 return False
208 208 return orig(cmd)
209 209
210 210 extensions.wrapfunction(wireprotoserver, 'iscmd', _iscmd)
211 211
212 212 def _loadfileblob(repo, cachepath, path, node):
213 213 filecachepath = os.path.join(cachepath, path, hex(node))
214 214 if not os.path.exists(filecachepath) or os.path.getsize(filecachepath) == 0:
215 215 filectx = repo.filectx(path, fileid=node)
216 216 if filectx.node() == nullid:
217 217 repo.changelog = changelog.changelog(repo.svfs)
218 218 filectx = repo.filectx(path, fileid=node)
219 219
220 220 text = createfileblob(filectx)
221 221 # TODO configurable compression engines
222 222 text = zlib.compress(text)
223 223
224 224 # everything should be user & group read/writable
225 225 oldumask = os.umask(0o002)
226 226 try:
227 227 dirname = os.path.dirname(filecachepath)
228 228 if not os.path.exists(dirname):
229 229 try:
230 230 os.makedirs(dirname)
231 231 except OSError as ex:
232 232 if ex.errno != errno.EEXIST:
233 233 raise
234 234
235 235 f = None
236 236 try:
237 237 f = util.atomictempfile(filecachepath, "wb")
238 238 f.write(text)
239 239 except (IOError, OSError):
240 240 # Don't abort if the user only has permission to read,
241 241 # and not write.
242 242 pass
243 243 finally:
244 244 if f:
245 245 f.close()
246 246 finally:
247 247 os.umask(oldumask)
248 248 else:
249 249 with open(filecachepath, "rb") as f:
250 250 text = f.read()
251 251 return text
252 252
253 253 def getflogheads(repo, proto, path):
254 254 """A server api for requesting a filelog's heads
255 255 """
256 256 flog = repo.file(path)
257 257 heads = flog.heads()
258 258 return '\n'.join((hex(head) for head in heads if head != nullid))
259 259
260 260 def getfile(repo, proto, file, node):
261 261 """A server api for requesting a particular version of a file. Can be used
262 262 in batches to request many files at once. The return protocol is:
263 263 <errorcode>\0<data/errormsg> where <errorcode> is 0 for success or
264 264 non-zero for an error.
265 265
266 266 data is a compressed blob with revlog flag and ancestors information. See
267 267 createfileblob for its content.
268 268 """
269 269 if shallowutil.isenabled(repo):
270 270 return '1\0' + _('cannot fetch remote files from shallow repo')
271 271 cachepath = repo.ui.config("remotefilelog", "servercachepath")
272 272 if not cachepath:
273 273 cachepath = os.path.join(repo.path, "remotefilelogcache")
274 274 node = bin(node.strip())
275 275 if node == nullid:
276 276 return '0\0'
277 277 return '0\0' + _loadfileblob(repo, cachepath, file, node)
278 278
279 279 def getfiles(repo, proto):
280 280 """A server api for requesting particular versions of particular files.
281 281 """
282 282 if shallowutil.isenabled(repo):
283 283 raise error.Abort(_('cannot fetch remote files from shallow repo'))
284 284 if not isinstance(proto, _sshv1server):
285 285 raise error.Abort(_('cannot fetch remote files over non-ssh protocol'))
286 286
287 287 def streamer():
288 288 fin = proto._fin
289 289
290 290 cachepath = repo.ui.config("remotefilelog", "servercachepath")
291 291 if not cachepath:
292 292 cachepath = os.path.join(repo.path, "remotefilelogcache")
293 293
294 294 while True:
295 295 request = fin.readline()[:-1]
296 296 if not request:
297 297 break
298 298
299 299 node = bin(request[:40])
300 300 if node == nullid:
301 301 yield '0\n'
302 302 continue
303 303
304 304 path = request[40:]
305 305
306 306 text = _loadfileblob(repo, cachepath, path, node)
307 307
308 308 yield '%d\n%s' % (len(text), text)
309 309
310 310 # it would be better to only flush after processing a whole batch
311 311 # but currently we don't know if there are more requests coming
312 312 proto._fout.flush()
313 313 return wireprototypes.streamres(streamer())
314 314
315 315 def createfileblob(filectx):
316 316 """
317 317 format:
318 318 v0:
319 319 str(len(rawtext)) + '\0' + rawtext + ancestortext
320 320 v1:
321 321 'v1' + '\n' + metalist + '\0' + rawtext + ancestortext
322 322 metalist := metalist + '\n' + meta | meta
323 323 meta := sizemeta | flagmeta
324 324 sizemeta := METAKEYSIZE + str(len(rawtext))
325 325 flagmeta := METAKEYFLAG + str(flag)
326 326
327 327 note: sizemeta must exist. METAKEYFLAG and METAKEYSIZE must have a
328 328 length of 1.
329 329 """
330 330 flog = filectx.filelog()
331 331 frev = filectx.filerev()
332 332 revlogflags = flog._revlog.flags(frev)
333 333 if revlogflags == 0:
334 334 # normal files
335 335 text = filectx.data()
336 336 else:
337 337 # lfs, read raw revision data
338 text = flog.revision(frev, raw=True)
338 text = flog.rawdata(frev)
339 339
340 340 repo = filectx._repo
341 341
342 342 ancestors = [filectx]
343 343
344 344 try:
345 345 repo.forcelinkrev = True
346 346 ancestors.extend([f for f in filectx.ancestors()])
347 347
348 348 ancestortext = ""
349 349 for ancestorctx in ancestors:
350 350 parents = ancestorctx.parents()
351 351 p1 = nullid
352 352 p2 = nullid
353 353 if len(parents) > 0:
354 354 p1 = parents[0].filenode()
355 355 if len(parents) > 1:
356 356 p2 = parents[1].filenode()
357 357
358 358 copyname = ""
359 359 rename = ancestorctx.renamed()
360 360 if rename:
361 361 copyname = rename[0]
362 362 linknode = ancestorctx.node()
363 363 ancestortext += "%s%s%s%s%s\0" % (
364 364 ancestorctx.filenode(), p1, p2, linknode,
365 365 copyname)
366 366 finally:
367 367 repo.forcelinkrev = False
368 368
369 369 header = shallowutil.buildfileblobheader(len(text), revlogflags)
370 370
371 371 return "%s\0%s%s" % (header, text, ancestortext)
372 372
373 373 def gcserver(ui, repo):
374 374 if not repo.ui.configbool("remotefilelog", "server"):
375 375 return
376 376
377 377 neededfiles = set()
378 378 heads = repo.revs("heads(tip~25000:) - null")
379 379
380 380 cachepath = repo.vfs.join("remotefilelogcache")
381 381 for head in heads:
382 382 mf = repo[head].manifest()
383 383 for filename, filenode in mf.iteritems():
384 384 filecachepath = os.path.join(cachepath, filename, hex(filenode))
385 385 neededfiles.add(filecachepath)
386 386
387 387 # delete unneeded older files
388 388 days = repo.ui.configint("remotefilelog", "serverexpiration")
389 389 expiration = time.time() - (days * 24 * 60 * 60)
390 390
391 391 progress = ui.makeprogress(_("removing old server cache"), unit="files")
392 392 progress.update(0)
393 393 for root, dirs, files in os.walk(cachepath):
394 394 for file in files:
395 395 filepath = os.path.join(root, file)
396 396 progress.increment()
397 397 if filepath in neededfiles:
398 398 continue
399 399
400 400 stat = os.stat(filepath)
401 401 if stat.st_mtime < expiration:
402 402 os.remove(filepath)
403 403
404 404 progress.complete()
General Comments 0
You need to be logged in to leave comments. Login now