##// END OF EJS Templates
rawdata: update caller in remotefilelog...
marmoute -
r43016:d53975bb default draft
parent child Browse files
Show More
@@ -1,376 +1,376 b''
1 from __future__ import absolute_import
1 from __future__ import absolute_import
2
2
3 import threading
3 import threading
4
4
5 from mercurial.node import hex, nullid
5 from mercurial.node import hex, nullid
6 from mercurial import (
6 from mercurial import (
7 mdiff,
7 mdiff,
8 pycompat,
8 pycompat,
9 revlog,
9 revlog,
10 )
10 )
11 from . import (
11 from . import (
12 basestore,
12 basestore,
13 constants,
13 constants,
14 shallowutil,
14 shallowutil,
15 )
15 )
16
16
17 class ChainIndicies(object):
17 class ChainIndicies(object):
18 """A static class for easy reference to the delta chain indicies.
18 """A static class for easy reference to the delta chain indicies.
19 """
19 """
20 # The filename of this revision delta
20 # The filename of this revision delta
21 NAME = 0
21 NAME = 0
22 # The mercurial file node for this revision delta
22 # The mercurial file node for this revision delta
23 NODE = 1
23 NODE = 1
24 # The filename of the delta base's revision. This is useful when delta
24 # The filename of the delta base's revision. This is useful when delta
25 # between different files (like in the case of a move or copy, we can delta
25 # between different files (like in the case of a move or copy, we can delta
26 # against the original file content).
26 # against the original file content).
27 BASENAME = 2
27 BASENAME = 2
28 # The mercurial file node for the delta base revision. This is the nullid if
28 # The mercurial file node for the delta base revision. This is the nullid if
29 # this delta is a full text.
29 # this delta is a full text.
30 BASENODE = 3
30 BASENODE = 3
31 # The actual delta or full text data.
31 # The actual delta or full text data.
32 DATA = 4
32 DATA = 4
33
33
34 class unioncontentstore(basestore.baseunionstore):
34 class unioncontentstore(basestore.baseunionstore):
35 def __init__(self, *args, **kwargs):
35 def __init__(self, *args, **kwargs):
36 super(unioncontentstore, self).__init__(*args, **kwargs)
36 super(unioncontentstore, self).__init__(*args, **kwargs)
37
37
38 self.stores = args
38 self.stores = args
39 self.writestore = kwargs.get(r'writestore')
39 self.writestore = kwargs.get(r'writestore')
40
40
41 # If allowincomplete==True then the union store can return partial
41 # If allowincomplete==True then the union store can return partial
42 # delta chains, otherwise it will throw a KeyError if a full
42 # delta chains, otherwise it will throw a KeyError if a full
43 # deltachain can't be found.
43 # deltachain can't be found.
44 self.allowincomplete = kwargs.get(r'allowincomplete', False)
44 self.allowincomplete = kwargs.get(r'allowincomplete', False)
45
45
46 def get(self, name, node):
46 def get(self, name, node):
47 """Fetches the full text revision contents of the given name+node pair.
47 """Fetches the full text revision contents of the given name+node pair.
48 If the full text doesn't exist, throws a KeyError.
48 If the full text doesn't exist, throws a KeyError.
49
49
50 Under the hood, this uses getdeltachain() across all the stores to build
50 Under the hood, this uses getdeltachain() across all the stores to build
51 up a full chain to produce the full text.
51 up a full chain to produce the full text.
52 """
52 """
53 chain = self.getdeltachain(name, node)
53 chain = self.getdeltachain(name, node)
54
54
55 if chain[-1][ChainIndicies.BASENODE] != nullid:
55 if chain[-1][ChainIndicies.BASENODE] != nullid:
56 # If we didn't receive a full chain, throw
56 # If we didn't receive a full chain, throw
57 raise KeyError((name, hex(node)))
57 raise KeyError((name, hex(node)))
58
58
59 # The last entry in the chain is a full text, so we start our delta
59 # The last entry in the chain is a full text, so we start our delta
60 # applies with that.
60 # applies with that.
61 fulltext = chain.pop()[ChainIndicies.DATA]
61 fulltext = chain.pop()[ChainIndicies.DATA]
62
62
63 text = fulltext
63 text = fulltext
64 while chain:
64 while chain:
65 delta = chain.pop()[ChainIndicies.DATA]
65 delta = chain.pop()[ChainIndicies.DATA]
66 text = mdiff.patches(text, [delta])
66 text = mdiff.patches(text, [delta])
67
67
68 return text
68 return text
69
69
70 @basestore.baseunionstore.retriable
70 @basestore.baseunionstore.retriable
71 def getdelta(self, name, node):
71 def getdelta(self, name, node):
72 """Return the single delta entry for the given name/node pair.
72 """Return the single delta entry for the given name/node pair.
73 """
73 """
74 for store in self.stores:
74 for store in self.stores:
75 try:
75 try:
76 return store.getdelta(name, node)
76 return store.getdelta(name, node)
77 except KeyError:
77 except KeyError:
78 pass
78 pass
79
79
80 raise KeyError((name, hex(node)))
80 raise KeyError((name, hex(node)))
81
81
82 def getdeltachain(self, name, node):
82 def getdeltachain(self, name, node):
83 """Returns the deltachain for the given name/node pair.
83 """Returns the deltachain for the given name/node pair.
84
84
85 Returns an ordered list of:
85 Returns an ordered list of:
86
86
87 [(name, node, deltabasename, deltabasenode, deltacontent),...]
87 [(name, node, deltabasename, deltabasenode, deltacontent),...]
88
88
89 where the chain is terminated by a full text entry with a nullid
89 where the chain is terminated by a full text entry with a nullid
90 deltabasenode.
90 deltabasenode.
91 """
91 """
92 chain = self._getpartialchain(name, node)
92 chain = self._getpartialchain(name, node)
93 while chain[-1][ChainIndicies.BASENODE] != nullid:
93 while chain[-1][ChainIndicies.BASENODE] != nullid:
94 x, x, deltabasename, deltabasenode, x = chain[-1]
94 x, x, deltabasename, deltabasenode, x = chain[-1]
95 try:
95 try:
96 morechain = self._getpartialchain(deltabasename, deltabasenode)
96 morechain = self._getpartialchain(deltabasename, deltabasenode)
97 chain.extend(morechain)
97 chain.extend(morechain)
98 except KeyError:
98 except KeyError:
99 # If we allow incomplete chains, don't throw.
99 # If we allow incomplete chains, don't throw.
100 if not self.allowincomplete:
100 if not self.allowincomplete:
101 raise
101 raise
102 break
102 break
103
103
104 return chain
104 return chain
105
105
106 @basestore.baseunionstore.retriable
106 @basestore.baseunionstore.retriable
107 def getmeta(self, name, node):
107 def getmeta(self, name, node):
108 """Returns the metadata dict for given node."""
108 """Returns the metadata dict for given node."""
109 for store in self.stores:
109 for store in self.stores:
110 try:
110 try:
111 return store.getmeta(name, node)
111 return store.getmeta(name, node)
112 except KeyError:
112 except KeyError:
113 pass
113 pass
114 raise KeyError((name, hex(node)))
114 raise KeyError((name, hex(node)))
115
115
116 def getmetrics(self):
116 def getmetrics(self):
117 metrics = [s.getmetrics() for s in self.stores]
117 metrics = [s.getmetrics() for s in self.stores]
118 return shallowutil.sumdicts(*metrics)
118 return shallowutil.sumdicts(*metrics)
119
119
120 @basestore.baseunionstore.retriable
120 @basestore.baseunionstore.retriable
121 def _getpartialchain(self, name, node):
121 def _getpartialchain(self, name, node):
122 """Returns a partial delta chain for the given name/node pair.
122 """Returns a partial delta chain for the given name/node pair.
123
123
124 A partial chain is a chain that may not be terminated in a full-text.
124 A partial chain is a chain that may not be terminated in a full-text.
125 """
125 """
126 for store in self.stores:
126 for store in self.stores:
127 try:
127 try:
128 return store.getdeltachain(name, node)
128 return store.getdeltachain(name, node)
129 except KeyError:
129 except KeyError:
130 pass
130 pass
131
131
132 raise KeyError((name, hex(node)))
132 raise KeyError((name, hex(node)))
133
133
134 def add(self, name, node, data):
134 def add(self, name, node, data):
135 raise RuntimeError("cannot add content only to remotefilelog "
135 raise RuntimeError("cannot add content only to remotefilelog "
136 "contentstore")
136 "contentstore")
137
137
138 def getmissing(self, keys):
138 def getmissing(self, keys):
139 missing = keys
139 missing = keys
140 for store in self.stores:
140 for store in self.stores:
141 if missing:
141 if missing:
142 missing = store.getmissing(missing)
142 missing = store.getmissing(missing)
143 return missing
143 return missing
144
144
145 def addremotefilelognode(self, name, node, data):
145 def addremotefilelognode(self, name, node, data):
146 if self.writestore:
146 if self.writestore:
147 self.writestore.addremotefilelognode(name, node, data)
147 self.writestore.addremotefilelognode(name, node, data)
148 else:
148 else:
149 raise RuntimeError("no writable store configured")
149 raise RuntimeError("no writable store configured")
150
150
151 def markledger(self, ledger, options=None):
151 def markledger(self, ledger, options=None):
152 for store in self.stores:
152 for store in self.stores:
153 store.markledger(ledger, options)
153 store.markledger(ledger, options)
154
154
155 class remotefilelogcontentstore(basestore.basestore):
155 class remotefilelogcontentstore(basestore.basestore):
156 def __init__(self, *args, **kwargs):
156 def __init__(self, *args, **kwargs):
157 super(remotefilelogcontentstore, self).__init__(*args, **kwargs)
157 super(remotefilelogcontentstore, self).__init__(*args, **kwargs)
158 self._threaddata = threading.local()
158 self._threaddata = threading.local()
159
159
160 def get(self, name, node):
160 def get(self, name, node):
161 # return raw revision text
161 # return raw revision text
162 data = self._getdata(name, node)
162 data = self._getdata(name, node)
163
163
164 offset, size, flags = shallowutil.parsesizeflags(data)
164 offset, size, flags = shallowutil.parsesizeflags(data)
165 content = data[offset:offset + size]
165 content = data[offset:offset + size]
166
166
167 ancestormap = shallowutil.ancestormap(data)
167 ancestormap = shallowutil.ancestormap(data)
168 p1, p2, linknode, copyfrom = ancestormap[node]
168 p1, p2, linknode, copyfrom = ancestormap[node]
169 copyrev = None
169 copyrev = None
170 if copyfrom:
170 if copyfrom:
171 copyrev = hex(p1)
171 copyrev = hex(p1)
172
172
173 self._updatemetacache(node, size, flags)
173 self._updatemetacache(node, size, flags)
174
174
175 # lfs tracks renames in its own metadata, remove hg copy metadata,
175 # lfs tracks renames in its own metadata, remove hg copy metadata,
176 # because copy metadata will be re-added by lfs flag processor.
176 # because copy metadata will be re-added by lfs flag processor.
177 if flags & revlog.REVIDX_EXTSTORED:
177 if flags & revlog.REVIDX_EXTSTORED:
178 copyrev = copyfrom = None
178 copyrev = copyfrom = None
179 revision = shallowutil.createrevlogtext(content, copyfrom, copyrev)
179 revision = shallowutil.createrevlogtext(content, copyfrom, copyrev)
180 return revision
180 return revision
181
181
182 def getdelta(self, name, node):
182 def getdelta(self, name, node):
183 # Since remotefilelog content stores only contain full texts, just
183 # Since remotefilelog content stores only contain full texts, just
184 # return that.
184 # return that.
185 revision = self.get(name, node)
185 revision = self.get(name, node)
186 return revision, name, nullid, self.getmeta(name, node)
186 return revision, name, nullid, self.getmeta(name, node)
187
187
188 def getdeltachain(self, name, node):
188 def getdeltachain(self, name, node):
189 # Since remotefilelog content stores just contain full texts, we return
189 # Since remotefilelog content stores just contain full texts, we return
190 # a fake delta chain that just consists of a single full text revision.
190 # a fake delta chain that just consists of a single full text revision.
191 # The nullid in the deltabasenode slot indicates that the revision is a
191 # The nullid in the deltabasenode slot indicates that the revision is a
192 # fulltext.
192 # fulltext.
193 revision = self.get(name, node)
193 revision = self.get(name, node)
194 return [(name, node, None, nullid, revision)]
194 return [(name, node, None, nullid, revision)]
195
195
196 def getmeta(self, name, node):
196 def getmeta(self, name, node):
197 self._sanitizemetacache()
197 self._sanitizemetacache()
198 if node != self._threaddata.metacache[0]:
198 if node != self._threaddata.metacache[0]:
199 data = self._getdata(name, node)
199 data = self._getdata(name, node)
200 offset, size, flags = shallowutil.parsesizeflags(data)
200 offset, size, flags = shallowutil.parsesizeflags(data)
201 self._updatemetacache(node, size, flags)
201 self._updatemetacache(node, size, flags)
202 return self._threaddata.metacache[1]
202 return self._threaddata.metacache[1]
203
203
204 def add(self, name, node, data):
204 def add(self, name, node, data):
205 raise RuntimeError("cannot add content only to remotefilelog "
205 raise RuntimeError("cannot add content only to remotefilelog "
206 "contentstore")
206 "contentstore")
207
207
208 def _sanitizemetacache(self):
208 def _sanitizemetacache(self):
209 metacache = getattr(self._threaddata, 'metacache', None)
209 metacache = getattr(self._threaddata, 'metacache', None)
210 if metacache is None:
210 if metacache is None:
211 self._threaddata.metacache = (None, None) # (node, meta)
211 self._threaddata.metacache = (None, None) # (node, meta)
212
212
213 def _updatemetacache(self, node, size, flags):
213 def _updatemetacache(self, node, size, flags):
214 self._sanitizemetacache()
214 self._sanitizemetacache()
215 if node == self._threaddata.metacache[0]:
215 if node == self._threaddata.metacache[0]:
216 return
216 return
217 meta = {constants.METAKEYFLAG: flags,
217 meta = {constants.METAKEYFLAG: flags,
218 constants.METAKEYSIZE: size}
218 constants.METAKEYSIZE: size}
219 self._threaddata.metacache = (node, meta)
219 self._threaddata.metacache = (node, meta)
220
220
221 class remotecontentstore(object):
221 class remotecontentstore(object):
222 def __init__(self, ui, fileservice, shared):
222 def __init__(self, ui, fileservice, shared):
223 self._fileservice = fileservice
223 self._fileservice = fileservice
224 # type(shared) is usually remotefilelogcontentstore
224 # type(shared) is usually remotefilelogcontentstore
225 self._shared = shared
225 self._shared = shared
226
226
227 def get(self, name, node):
227 def get(self, name, node):
228 self._fileservice.prefetch([(name, hex(node))], force=True,
228 self._fileservice.prefetch([(name, hex(node))], force=True,
229 fetchdata=True)
229 fetchdata=True)
230 return self._shared.get(name, node)
230 return self._shared.get(name, node)
231
231
232 def getdelta(self, name, node):
232 def getdelta(self, name, node):
233 revision = self.get(name, node)
233 revision = self.get(name, node)
234 return revision, name, nullid, self._shared.getmeta(name, node)
234 return revision, name, nullid, self._shared.getmeta(name, node)
235
235
236 def getdeltachain(self, name, node):
236 def getdeltachain(self, name, node):
237 # Since our remote content stores just contain full texts, we return a
237 # Since our remote content stores just contain full texts, we return a
238 # fake delta chain that just consists of a single full text revision.
238 # fake delta chain that just consists of a single full text revision.
239 # The nullid in the deltabasenode slot indicates that the revision is a
239 # The nullid in the deltabasenode slot indicates that the revision is a
240 # fulltext.
240 # fulltext.
241 revision = self.get(name, node)
241 revision = self.get(name, node)
242 return [(name, node, None, nullid, revision)]
242 return [(name, node, None, nullid, revision)]
243
243
244 def getmeta(self, name, node):
244 def getmeta(self, name, node):
245 self._fileservice.prefetch([(name, hex(node))], force=True,
245 self._fileservice.prefetch([(name, hex(node))], force=True,
246 fetchdata=True)
246 fetchdata=True)
247 return self._shared.getmeta(name, node)
247 return self._shared.getmeta(name, node)
248
248
249 def add(self, name, node, data):
249 def add(self, name, node, data):
250 raise RuntimeError("cannot add to a remote store")
250 raise RuntimeError("cannot add to a remote store")
251
251
252 def getmissing(self, keys):
252 def getmissing(self, keys):
253 return keys
253 return keys
254
254
255 def markledger(self, ledger, options=None):
255 def markledger(self, ledger, options=None):
256 pass
256 pass
257
257
258 class manifestrevlogstore(object):
258 class manifestrevlogstore(object):
259 def __init__(self, repo):
259 def __init__(self, repo):
260 self._store = repo.store
260 self._store = repo.store
261 self._svfs = repo.svfs
261 self._svfs = repo.svfs
262 self._revlogs = dict()
262 self._revlogs = dict()
263 self._cl = revlog.revlog(self._svfs, '00changelog.i')
263 self._cl = revlog.revlog(self._svfs, '00changelog.i')
264 self._repackstartlinkrev = 0
264 self._repackstartlinkrev = 0
265
265
266 def get(self, name, node):
266 def get(self, name, node):
267 return self._revlog(name).revision(node, raw=True)
267 return self._revlog(name).rawdata(node)
268
268
269 def getdelta(self, name, node):
269 def getdelta(self, name, node):
270 revision = self.get(name, node)
270 revision = self.get(name, node)
271 return revision, name, nullid, self.getmeta(name, node)
271 return revision, name, nullid, self.getmeta(name, node)
272
272
273 def getdeltachain(self, name, node):
273 def getdeltachain(self, name, node):
274 revision = self.get(name, node)
274 revision = self.get(name, node)
275 return [(name, node, None, nullid, revision)]
275 return [(name, node, None, nullid, revision)]
276
276
277 def getmeta(self, name, node):
277 def getmeta(self, name, node):
278 rl = self._revlog(name)
278 rl = self._revlog(name)
279 rev = rl.rev(node)
279 rev = rl.rev(node)
280 return {constants.METAKEYFLAG: rl.flags(rev),
280 return {constants.METAKEYFLAG: rl.flags(rev),
281 constants.METAKEYSIZE: rl.rawsize(rev)}
281 constants.METAKEYSIZE: rl.rawsize(rev)}
282
282
283 def getancestors(self, name, node, known=None):
283 def getancestors(self, name, node, known=None):
284 if known is None:
284 if known is None:
285 known = set()
285 known = set()
286 if node in known:
286 if node in known:
287 return []
287 return []
288
288
289 rl = self._revlog(name)
289 rl = self._revlog(name)
290 ancestors = {}
290 ancestors = {}
291 missing = set((node,))
291 missing = set((node,))
292 for ancrev in rl.ancestors([rl.rev(node)], inclusive=True):
292 for ancrev in rl.ancestors([rl.rev(node)], inclusive=True):
293 ancnode = rl.node(ancrev)
293 ancnode = rl.node(ancrev)
294 missing.discard(ancnode)
294 missing.discard(ancnode)
295
295
296 p1, p2 = rl.parents(ancnode)
296 p1, p2 = rl.parents(ancnode)
297 if p1 != nullid and p1 not in known:
297 if p1 != nullid and p1 not in known:
298 missing.add(p1)
298 missing.add(p1)
299 if p2 != nullid and p2 not in known:
299 if p2 != nullid and p2 not in known:
300 missing.add(p2)
300 missing.add(p2)
301
301
302 linknode = self._cl.node(rl.linkrev(ancrev))
302 linknode = self._cl.node(rl.linkrev(ancrev))
303 ancestors[rl.node(ancrev)] = (p1, p2, linknode, '')
303 ancestors[rl.node(ancrev)] = (p1, p2, linknode, '')
304 if not missing:
304 if not missing:
305 break
305 break
306 return ancestors
306 return ancestors
307
307
308 def getnodeinfo(self, name, node):
308 def getnodeinfo(self, name, node):
309 cl = self._cl
309 cl = self._cl
310 rl = self._revlog(name)
310 rl = self._revlog(name)
311 parents = rl.parents(node)
311 parents = rl.parents(node)
312 linkrev = rl.linkrev(rl.rev(node))
312 linkrev = rl.linkrev(rl.rev(node))
313 return (parents[0], parents[1], cl.node(linkrev), None)
313 return (parents[0], parents[1], cl.node(linkrev), None)
314
314
315 def add(self, *args):
315 def add(self, *args):
316 raise RuntimeError("cannot add to a revlog store")
316 raise RuntimeError("cannot add to a revlog store")
317
317
318 def _revlog(self, name):
318 def _revlog(self, name):
319 rl = self._revlogs.get(name)
319 rl = self._revlogs.get(name)
320 if rl is None:
320 if rl is None:
321 revlogname = '00manifesttree.i'
321 revlogname = '00manifesttree.i'
322 if name != '':
322 if name != '':
323 revlogname = 'meta/%s/00manifest.i' % name
323 revlogname = 'meta/%s/00manifest.i' % name
324 rl = revlog.revlog(self._svfs, revlogname)
324 rl = revlog.revlog(self._svfs, revlogname)
325 self._revlogs[name] = rl
325 self._revlogs[name] = rl
326 return rl
326 return rl
327
327
328 def getmissing(self, keys):
328 def getmissing(self, keys):
329 missing = []
329 missing = []
330 for name, node in keys:
330 for name, node in keys:
331 mfrevlog = self._revlog(name)
331 mfrevlog = self._revlog(name)
332 if node not in mfrevlog.nodemap:
332 if node not in mfrevlog.nodemap:
333 missing.append((name, node))
333 missing.append((name, node))
334
334
335 return missing
335 return missing
336
336
337 def setrepacklinkrevrange(self, startrev, endrev):
337 def setrepacklinkrevrange(self, startrev, endrev):
338 self._repackstartlinkrev = startrev
338 self._repackstartlinkrev = startrev
339 self._repackendlinkrev = endrev
339 self._repackendlinkrev = endrev
340
340
341 def markledger(self, ledger, options=None):
341 def markledger(self, ledger, options=None):
342 if options and options.get(constants.OPTION_PACKSONLY):
342 if options and options.get(constants.OPTION_PACKSONLY):
343 return
343 return
344 treename = ''
344 treename = ''
345 rl = revlog.revlog(self._svfs, '00manifesttree.i')
345 rl = revlog.revlog(self._svfs, '00manifesttree.i')
346 startlinkrev = self._repackstartlinkrev
346 startlinkrev = self._repackstartlinkrev
347 endlinkrev = self._repackendlinkrev
347 endlinkrev = self._repackendlinkrev
348 for rev in pycompat.xrange(len(rl) - 1, -1, -1):
348 for rev in pycompat.xrange(len(rl) - 1, -1, -1):
349 linkrev = rl.linkrev(rev)
349 linkrev = rl.linkrev(rev)
350 if linkrev < startlinkrev:
350 if linkrev < startlinkrev:
351 break
351 break
352 if linkrev > endlinkrev:
352 if linkrev > endlinkrev:
353 continue
353 continue
354 node = rl.node(rev)
354 node = rl.node(rev)
355 ledger.markdataentry(self, treename, node)
355 ledger.markdataentry(self, treename, node)
356 ledger.markhistoryentry(self, treename, node)
356 ledger.markhistoryentry(self, treename, node)
357
357
358 for path, encoded, size in self._store.datafiles():
358 for path, encoded, size in self._store.datafiles():
359 if path[:5] != 'meta/' or path[-2:] != '.i':
359 if path[:5] != 'meta/' or path[-2:] != '.i':
360 continue
360 continue
361
361
362 treename = path[5:-len('/00manifest.i')]
362 treename = path[5:-len('/00manifest.i')]
363
363
364 rl = revlog.revlog(self._svfs, path)
364 rl = revlog.revlog(self._svfs, path)
365 for rev in pycompat.xrange(len(rl) - 1, -1, -1):
365 for rev in pycompat.xrange(len(rl) - 1, -1, -1):
366 linkrev = rl.linkrev(rev)
366 linkrev = rl.linkrev(rev)
367 if linkrev < startlinkrev:
367 if linkrev < startlinkrev:
368 break
368 break
369 if linkrev > endlinkrev:
369 if linkrev > endlinkrev:
370 continue
370 continue
371 node = rl.node(rev)
371 node = rl.node(rev)
372 ledger.markdataentry(self, treename, node)
372 ledger.markdataentry(self, treename, node)
373 ledger.markhistoryentry(self, treename, node)
373 ledger.markhistoryentry(self, treename, node)
374
374
375 def cleanup(self, ledger):
375 def cleanup(self, ledger):
376 pass
376 pass
@@ -1,584 +1,584 b''
1 # fileserverclient.py - client for communicating with the cache process
1 # fileserverclient.py - client for communicating with the cache process
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import io
11 import io
12 import os
12 import os
13 import threading
13 import threading
14 import time
14 import time
15 import zlib
15 import zlib
16
16
17 from mercurial.i18n import _
17 from mercurial.i18n import _
18 from mercurial.node import bin, hex, nullid
18 from mercurial.node import bin, hex, nullid
19 from mercurial import (
19 from mercurial import (
20 error,
20 error,
21 node,
21 node,
22 pycompat,
22 pycompat,
23 revlog,
23 revlog,
24 sshpeer,
24 sshpeer,
25 util,
25 util,
26 wireprotov1peer,
26 wireprotov1peer,
27 )
27 )
28 from mercurial.utils import procutil
28 from mercurial.utils import procutil
29
29
30 from . import (
30 from . import (
31 constants,
31 constants,
32 contentstore,
32 contentstore,
33 metadatastore,
33 metadatastore,
34 )
34 )
35
35
36 _sshv1peer = sshpeer.sshv1peer
36 _sshv1peer = sshpeer.sshv1peer
37
37
38 # Statistics for debugging
38 # Statistics for debugging
39 fetchcost = 0
39 fetchcost = 0
40 fetches = 0
40 fetches = 0
41 fetched = 0
41 fetched = 0
42 fetchmisses = 0
42 fetchmisses = 0
43
43
44 _lfsmod = None
44 _lfsmod = None
45
45
46 def getcachekey(reponame, file, id):
46 def getcachekey(reponame, file, id):
47 pathhash = node.hex(hashlib.sha1(file).digest())
47 pathhash = node.hex(hashlib.sha1(file).digest())
48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
49
49
50 def getlocalkey(file, id):
50 def getlocalkey(file, id):
51 pathhash = node.hex(hashlib.sha1(file).digest())
51 pathhash = node.hex(hashlib.sha1(file).digest())
52 return os.path.join(pathhash, id)
52 return os.path.join(pathhash, id)
53
53
54 def peersetup(ui, peer):
54 def peersetup(ui, peer):
55
55
56 class remotefilepeer(peer.__class__):
56 class remotefilepeer(peer.__class__):
57 @wireprotov1peer.batchable
57 @wireprotov1peer.batchable
58 def x_rfl_getfile(self, file, node):
58 def x_rfl_getfile(self, file, node):
59 if not self.capable('x_rfl_getfile'):
59 if not self.capable('x_rfl_getfile'):
60 raise error.Abort(
60 raise error.Abort(
61 'configured remotefile server does not support getfile')
61 'configured remotefile server does not support getfile')
62 f = wireprotov1peer.future()
62 f = wireprotov1peer.future()
63 yield {'file': file, 'node': node}, f
63 yield {'file': file, 'node': node}, f
64 code, data = f.value.split('\0', 1)
64 code, data = f.value.split('\0', 1)
65 if int(code):
65 if int(code):
66 raise error.LookupError(file, node, data)
66 raise error.LookupError(file, node, data)
67 yield data
67 yield data
68
68
69 @wireprotov1peer.batchable
69 @wireprotov1peer.batchable
70 def x_rfl_getflogheads(self, path):
70 def x_rfl_getflogheads(self, path):
71 if not self.capable('x_rfl_getflogheads'):
71 if not self.capable('x_rfl_getflogheads'):
72 raise error.Abort('configured remotefile server does not '
72 raise error.Abort('configured remotefile server does not '
73 'support getflogheads')
73 'support getflogheads')
74 f = wireprotov1peer.future()
74 f = wireprotov1peer.future()
75 yield {'path': path}, f
75 yield {'path': path}, f
76 heads = f.value.split('\n') if f.value else []
76 heads = f.value.split('\n') if f.value else []
77 yield heads
77 yield heads
78
78
79 def _updatecallstreamopts(self, command, opts):
79 def _updatecallstreamopts(self, command, opts):
80 if command != 'getbundle':
80 if command != 'getbundle':
81 return
81 return
82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
83 not in self.capabilities()):
83 not in self.capabilities()):
84 return
84 return
85 if not util.safehasattr(self, '_localrepo'):
85 if not util.safehasattr(self, '_localrepo'):
86 return
86 return
87 if (constants.SHALLOWREPO_REQUIREMENT
87 if (constants.SHALLOWREPO_REQUIREMENT
88 not in self._localrepo.requirements):
88 not in self._localrepo.requirements):
89 return
89 return
90
90
91 bundlecaps = opts.get('bundlecaps')
91 bundlecaps = opts.get('bundlecaps')
92 if bundlecaps:
92 if bundlecaps:
93 bundlecaps = [bundlecaps]
93 bundlecaps = [bundlecaps]
94 else:
94 else:
95 bundlecaps = []
95 bundlecaps = []
96
96
97 # shallow, includepattern, and excludepattern are a hacky way of
97 # shallow, includepattern, and excludepattern are a hacky way of
98 # carrying over data from the local repo to this getbundle
98 # carrying over data from the local repo to this getbundle
99 # command. We need to do it this way because bundle1 getbundle
99 # command. We need to do it this way because bundle1 getbundle
100 # doesn't provide any other place we can hook in to manipulate
100 # doesn't provide any other place we can hook in to manipulate
101 # getbundle args before it goes across the wire. Once we get rid
101 # getbundle args before it goes across the wire. Once we get rid
102 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
102 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
103 # do this more cleanly.
103 # do this more cleanly.
104 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
104 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
105 if self._localrepo.includepattern:
105 if self._localrepo.includepattern:
106 patterns = '\0'.join(self._localrepo.includepattern)
106 patterns = '\0'.join(self._localrepo.includepattern)
107 includecap = "includepattern=" + patterns
107 includecap = "includepattern=" + patterns
108 bundlecaps.append(includecap)
108 bundlecaps.append(includecap)
109 if self._localrepo.excludepattern:
109 if self._localrepo.excludepattern:
110 patterns = '\0'.join(self._localrepo.excludepattern)
110 patterns = '\0'.join(self._localrepo.excludepattern)
111 excludecap = "excludepattern=" + patterns
111 excludecap = "excludepattern=" + patterns
112 bundlecaps.append(excludecap)
112 bundlecaps.append(excludecap)
113 opts['bundlecaps'] = ','.join(bundlecaps)
113 opts['bundlecaps'] = ','.join(bundlecaps)
114
114
115 def _sendrequest(self, command, args, **opts):
115 def _sendrequest(self, command, args, **opts):
116 self._updatecallstreamopts(command, args)
116 self._updatecallstreamopts(command, args)
117 return super(remotefilepeer, self)._sendrequest(command, args,
117 return super(remotefilepeer, self)._sendrequest(command, args,
118 **opts)
118 **opts)
119
119
120 def _callstream(self, command, **opts):
120 def _callstream(self, command, **opts):
121 supertype = super(remotefilepeer, self)
121 supertype = super(remotefilepeer, self)
122 if not util.safehasattr(supertype, '_sendrequest'):
122 if not util.safehasattr(supertype, '_sendrequest'):
123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
124 return super(remotefilepeer, self)._callstream(command, **opts)
124 return super(remotefilepeer, self)._callstream(command, **opts)
125
125
126 peer.__class__ = remotefilepeer
126 peer.__class__ = remotefilepeer
127
127
128 class cacheconnection(object):
128 class cacheconnection(object):
129 """The connection for communicating with the remote cache. Performs
129 """The connection for communicating with the remote cache. Performs
130 gets and sets by communicating with an external process that has the
130 gets and sets by communicating with an external process that has the
131 cache-specific implementation.
131 cache-specific implementation.
132 """
132 """
133 def __init__(self):
133 def __init__(self):
134 self.pipeo = self.pipei = self.pipee = None
134 self.pipeo = self.pipei = self.pipee = None
135 self.subprocess = None
135 self.subprocess = None
136 self.connected = False
136 self.connected = False
137
137
138 def connect(self, cachecommand):
138 def connect(self, cachecommand):
139 if self.pipeo:
139 if self.pipeo:
140 raise error.Abort(_("cache connection already open"))
140 raise error.Abort(_("cache connection already open"))
141 self.pipei, self.pipeo, self.pipee, self.subprocess = (
141 self.pipei, self.pipeo, self.pipee, self.subprocess = (
142 procutil.popen4(cachecommand))
142 procutil.popen4(cachecommand))
143 self.connected = True
143 self.connected = True
144
144
145 def close(self):
145 def close(self):
146 def tryclose(pipe):
146 def tryclose(pipe):
147 try:
147 try:
148 pipe.close()
148 pipe.close()
149 except Exception:
149 except Exception:
150 pass
150 pass
151 if self.connected:
151 if self.connected:
152 try:
152 try:
153 self.pipei.write("exit\n")
153 self.pipei.write("exit\n")
154 except Exception:
154 except Exception:
155 pass
155 pass
156 tryclose(self.pipei)
156 tryclose(self.pipei)
157 self.pipei = None
157 self.pipei = None
158 tryclose(self.pipeo)
158 tryclose(self.pipeo)
159 self.pipeo = None
159 self.pipeo = None
160 tryclose(self.pipee)
160 tryclose(self.pipee)
161 self.pipee = None
161 self.pipee = None
162 try:
162 try:
163 # Wait for process to terminate, making sure to avoid deadlock.
163 # Wait for process to terminate, making sure to avoid deadlock.
164 # See https://docs.python.org/2/library/subprocess.html for
164 # See https://docs.python.org/2/library/subprocess.html for
165 # warnings about wait() and deadlocking.
165 # warnings about wait() and deadlocking.
166 self.subprocess.communicate()
166 self.subprocess.communicate()
167 except Exception:
167 except Exception:
168 pass
168 pass
169 self.subprocess = None
169 self.subprocess = None
170 self.connected = False
170 self.connected = False
171
171
172 def request(self, request, flush=True):
172 def request(self, request, flush=True):
173 if self.connected:
173 if self.connected:
174 try:
174 try:
175 self.pipei.write(request)
175 self.pipei.write(request)
176 if flush:
176 if flush:
177 self.pipei.flush()
177 self.pipei.flush()
178 except IOError:
178 except IOError:
179 self.close()
179 self.close()
180
180
181 def receiveline(self):
181 def receiveline(self):
182 if not self.connected:
182 if not self.connected:
183 return None
183 return None
184 try:
184 try:
185 result = self.pipeo.readline()[:-1]
185 result = self.pipeo.readline()[:-1]
186 if not result:
186 if not result:
187 self.close()
187 self.close()
188 except IOError:
188 except IOError:
189 self.close()
189 self.close()
190
190
191 return result
191 return result
192
192
193 def _getfilesbatch(
193 def _getfilesbatch(
194 remote, receivemissing, progresstick, missed, idmap, batchsize):
194 remote, receivemissing, progresstick, missed, idmap, batchsize):
195 # Over http(s), iterbatch is a streamy method and we can start
195 # Over http(s), iterbatch is a streamy method and we can start
196 # looking at results early. This means we send one (potentially
196 # looking at results early. This means we send one (potentially
197 # large) request, but then we show nice progress as we process
197 # large) request, but then we show nice progress as we process
198 # file results, rather than showing chunks of $batchsize in
198 # file results, rather than showing chunks of $batchsize in
199 # progress.
199 # progress.
200 #
200 #
201 # Over ssh, iterbatch isn't streamy because batch() wasn't
201 # Over ssh, iterbatch isn't streamy because batch() wasn't
202 # explicitly designed as a streaming method. In the future we
202 # explicitly designed as a streaming method. In the future we
203 # should probably introduce a streambatch() method upstream and
203 # should probably introduce a streambatch() method upstream and
204 # use that for this.
204 # use that for this.
205 with remote.commandexecutor() as e:
205 with remote.commandexecutor() as e:
206 futures = []
206 futures = []
207 for m in missed:
207 for m in missed:
208 futures.append(e.callcommand('x_rfl_getfile', {
208 futures.append(e.callcommand('x_rfl_getfile', {
209 'file': idmap[m],
209 'file': idmap[m],
210 'node': m[-40:]
210 'node': m[-40:]
211 }))
211 }))
212
212
213 for i, m in enumerate(missed):
213 for i, m in enumerate(missed):
214 r = futures[i].result()
214 r = futures[i].result()
215 futures[i] = None # release memory
215 futures[i] = None # release memory
216 file_ = idmap[m]
216 file_ = idmap[m]
217 node = m[-40:]
217 node = m[-40:]
218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
219 progresstick()
219 progresstick()
220
220
221 def _getfiles_optimistic(
221 def _getfiles_optimistic(
222 remote, receivemissing, progresstick, missed, idmap, step):
222 remote, receivemissing, progresstick, missed, idmap, step):
223 remote._callstream("x_rfl_getfiles")
223 remote._callstream("x_rfl_getfiles")
224 i = 0
224 i = 0
225 pipeo = remote._pipeo
225 pipeo = remote._pipeo
226 pipei = remote._pipei
226 pipei = remote._pipei
227 while i < len(missed):
227 while i < len(missed):
228 # issue a batch of requests
228 # issue a batch of requests
229 start = i
229 start = i
230 end = min(len(missed), start + step)
230 end = min(len(missed), start + step)
231 i = end
231 i = end
232 for missingid in missed[start:end]:
232 for missingid in missed[start:end]:
233 # issue new request
233 # issue new request
234 versionid = missingid[-40:]
234 versionid = missingid[-40:]
235 file = idmap[missingid]
235 file = idmap[missingid]
236 sshrequest = "%s%s\n" % (versionid, file)
236 sshrequest = "%s%s\n" % (versionid, file)
237 pipeo.write(sshrequest)
237 pipeo.write(sshrequest)
238 pipeo.flush()
238 pipeo.flush()
239
239
240 # receive batch results
240 # receive batch results
241 for missingid in missed[start:end]:
241 for missingid in missed[start:end]:
242 versionid = missingid[-40:]
242 versionid = missingid[-40:]
243 file = idmap[missingid]
243 file = idmap[missingid]
244 receivemissing(pipei, file, versionid)
244 receivemissing(pipei, file, versionid)
245 progresstick()
245 progresstick()
246
246
247 # End the command
247 # End the command
248 pipeo.write('\n')
248 pipeo.write('\n')
249 pipeo.flush()
249 pipeo.flush()
250
250
251 def _getfiles_threaded(
251 def _getfiles_threaded(
252 remote, receivemissing, progresstick, missed, idmap, step):
252 remote, receivemissing, progresstick, missed, idmap, step):
253 remote._callstream("getfiles")
253 remote._callstream("getfiles")
254 pipeo = remote._pipeo
254 pipeo = remote._pipeo
255 pipei = remote._pipei
255 pipei = remote._pipei
256
256
257 def writer():
257 def writer():
258 for missingid in missed:
258 for missingid in missed:
259 versionid = missingid[-40:]
259 versionid = missingid[-40:]
260 file = idmap[missingid]
260 file = idmap[missingid]
261 sshrequest = "%s%s\n" % (versionid, file)
261 sshrequest = "%s%s\n" % (versionid, file)
262 pipeo.write(sshrequest)
262 pipeo.write(sshrequest)
263 pipeo.flush()
263 pipeo.flush()
264 writerthread = threading.Thread(target=writer)
264 writerthread = threading.Thread(target=writer)
265 writerthread.daemon = True
265 writerthread.daemon = True
266 writerthread.start()
266 writerthread.start()
267
267
268 for missingid in missed:
268 for missingid in missed:
269 versionid = missingid[-40:]
269 versionid = missingid[-40:]
270 file = idmap[missingid]
270 file = idmap[missingid]
271 receivemissing(pipei, file, versionid)
271 receivemissing(pipei, file, versionid)
272 progresstick()
272 progresstick()
273
273
274 writerthread.join()
274 writerthread.join()
275 # End the command
275 # End the command
276 pipeo.write('\n')
276 pipeo.write('\n')
277 pipeo.flush()
277 pipeo.flush()
278
278
279 class fileserverclient(object):
279 class fileserverclient(object):
280 """A client for requesting files from the remote file server.
280 """A client for requesting files from the remote file server.
281 """
281 """
282 def __init__(self, repo):
282 def __init__(self, repo):
283 ui = repo.ui
283 ui = repo.ui
284 self.repo = repo
284 self.repo = repo
285 self.ui = ui
285 self.ui = ui
286 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
286 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
287 if self.cacheprocess:
287 if self.cacheprocess:
288 self.cacheprocess = util.expandpath(self.cacheprocess)
288 self.cacheprocess = util.expandpath(self.cacheprocess)
289
289
290 # This option causes remotefilelog to pass the full file path to the
290 # This option causes remotefilelog to pass the full file path to the
291 # cacheprocess instead of a hashed key.
291 # cacheprocess instead of a hashed key.
292 self.cacheprocesspasspath = ui.configbool(
292 self.cacheprocesspasspath = ui.configbool(
293 "remotefilelog", "cacheprocess.includepath")
293 "remotefilelog", "cacheprocess.includepath")
294
294
295 self.debugoutput = ui.configbool("remotefilelog", "debug")
295 self.debugoutput = ui.configbool("remotefilelog", "debug")
296
296
297 self.remotecache = cacheconnection()
297 self.remotecache = cacheconnection()
298
298
299 def setstore(self, datastore, historystore, writedata, writehistory):
299 def setstore(self, datastore, historystore, writedata, writehistory):
300 self.datastore = datastore
300 self.datastore = datastore
301 self.historystore = historystore
301 self.historystore = historystore
302 self.writedata = writedata
302 self.writedata = writedata
303 self.writehistory = writehistory
303 self.writehistory = writehistory
304
304
305 def _connect(self):
305 def _connect(self):
306 return self.repo.connectionpool.get(self.repo.fallbackpath)
306 return self.repo.connectionpool.get(self.repo.fallbackpath)
307
307
308 def request(self, fileids):
308 def request(self, fileids):
309 """Takes a list of filename/node pairs and fetches them from the
309 """Takes a list of filename/node pairs and fetches them from the
310 server. Files are stored in the local cache.
310 server. Files are stored in the local cache.
311 A list of nodes that the server couldn't find is returned.
311 A list of nodes that the server couldn't find is returned.
312 If the connection fails, an exception is raised.
312 If the connection fails, an exception is raised.
313 """
313 """
314 if not self.remotecache.connected:
314 if not self.remotecache.connected:
315 self.connect()
315 self.connect()
316 cache = self.remotecache
316 cache = self.remotecache
317 writedata = self.writedata
317 writedata = self.writedata
318
318
319 repo = self.repo
319 repo = self.repo
320 total = len(fileids)
320 total = len(fileids)
321 request = "get\n%d\n" % total
321 request = "get\n%d\n" % total
322 idmap = {}
322 idmap = {}
323 reponame = repo.name
323 reponame = repo.name
324 for file, id in fileids:
324 for file, id in fileids:
325 fullid = getcachekey(reponame, file, id)
325 fullid = getcachekey(reponame, file, id)
326 if self.cacheprocesspasspath:
326 if self.cacheprocesspasspath:
327 request += file + '\0'
327 request += file + '\0'
328 request += fullid + "\n"
328 request += fullid + "\n"
329 idmap[fullid] = file
329 idmap[fullid] = file
330
330
331 cache.request(request)
331 cache.request(request)
332
332
333 progress = self.ui.makeprogress(_('downloading'), total=total)
333 progress = self.ui.makeprogress(_('downloading'), total=total)
334 progress.update(0)
334 progress.update(0)
335
335
336 missed = []
336 missed = []
337 while True:
337 while True:
338 missingid = cache.receiveline()
338 missingid = cache.receiveline()
339 if not missingid:
339 if not missingid:
340 missedset = set(missed)
340 missedset = set(missed)
341 for missingid in idmap:
341 for missingid in idmap:
342 if not missingid in missedset:
342 if not missingid in missedset:
343 missed.append(missingid)
343 missed.append(missingid)
344 self.ui.warn(_("warning: cache connection closed early - " +
344 self.ui.warn(_("warning: cache connection closed early - " +
345 "falling back to server\n"))
345 "falling back to server\n"))
346 break
346 break
347 if missingid == "0":
347 if missingid == "0":
348 break
348 break
349 if missingid.startswith("_hits_"):
349 if missingid.startswith("_hits_"):
350 # receive progress reports
350 # receive progress reports
351 parts = missingid.split("_")
351 parts = missingid.split("_")
352 progress.increment(int(parts[2]))
352 progress.increment(int(parts[2]))
353 continue
353 continue
354
354
355 missed.append(missingid)
355 missed.append(missingid)
356
356
357 global fetchmisses
357 global fetchmisses
358 fetchmisses += len(missed)
358 fetchmisses += len(missed)
359
359
360 fromcache = total - len(missed)
360 fromcache = total - len(missed)
361 progress.update(fromcache, total=total)
361 progress.update(fromcache, total=total)
362 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
362 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
363 fromcache, total, hit=fromcache, total=total)
363 fromcache, total, hit=fromcache, total=total)
364
364
365 oldumask = os.umask(0o002)
365 oldumask = os.umask(0o002)
366 try:
366 try:
367 # receive cache misses from master
367 # receive cache misses from master
368 if missed:
368 if missed:
369 # When verbose is true, sshpeer prints 'running ssh...'
369 # When verbose is true, sshpeer prints 'running ssh...'
370 # to stdout, which can interfere with some command
370 # to stdout, which can interfere with some command
371 # outputs
371 # outputs
372 verbose = self.ui.verbose
372 verbose = self.ui.verbose
373 self.ui.verbose = False
373 self.ui.verbose = False
374 try:
374 try:
375 with self._connect() as conn:
375 with self._connect() as conn:
376 remote = conn.peer
376 remote = conn.peer
377 if remote.capable(
377 if remote.capable(
378 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
378 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
379 if not isinstance(remote, _sshv1peer):
379 if not isinstance(remote, _sshv1peer):
380 raise error.Abort('remotefilelog requires ssh '
380 raise error.Abort('remotefilelog requires ssh '
381 'servers')
381 'servers')
382 step = self.ui.configint('remotefilelog',
382 step = self.ui.configint('remotefilelog',
383 'getfilesstep')
383 'getfilesstep')
384 getfilestype = self.ui.config('remotefilelog',
384 getfilestype = self.ui.config('remotefilelog',
385 'getfilestype')
385 'getfilestype')
386 if getfilestype == 'threaded':
386 if getfilestype == 'threaded':
387 _getfiles = _getfiles_threaded
387 _getfiles = _getfiles_threaded
388 else:
388 else:
389 _getfiles = _getfiles_optimistic
389 _getfiles = _getfiles_optimistic
390 _getfiles(remote, self.receivemissing,
390 _getfiles(remote, self.receivemissing,
391 progress.increment, missed, idmap, step)
391 progress.increment, missed, idmap, step)
392 elif remote.capable("x_rfl_getfile"):
392 elif remote.capable("x_rfl_getfile"):
393 if remote.capable('batch'):
393 if remote.capable('batch'):
394 batchdefault = 100
394 batchdefault = 100
395 else:
395 else:
396 batchdefault = 10
396 batchdefault = 10
397 batchsize = self.ui.configint(
397 batchsize = self.ui.configint(
398 'remotefilelog', 'batchsize', batchdefault)
398 'remotefilelog', 'batchsize', batchdefault)
399 self.ui.debug(
399 self.ui.debug(
400 b'requesting %d files from '
400 b'requesting %d files from '
401 b'remotefilelog server...\n' % len(missed))
401 b'remotefilelog server...\n' % len(missed))
402 _getfilesbatch(
402 _getfilesbatch(
403 remote, self.receivemissing, progress.increment,
403 remote, self.receivemissing, progress.increment,
404 missed, idmap, batchsize)
404 missed, idmap, batchsize)
405 else:
405 else:
406 raise error.Abort("configured remotefilelog server"
406 raise error.Abort("configured remotefilelog server"
407 " does not support remotefilelog")
407 " does not support remotefilelog")
408
408
409 self.ui.log("remotefilefetchlog",
409 self.ui.log("remotefilefetchlog",
410 "Success\n",
410 "Success\n",
411 fetched_files = progress.pos - fromcache,
411 fetched_files = progress.pos - fromcache,
412 total_to_fetch = total - fromcache)
412 total_to_fetch = total - fromcache)
413 except Exception:
413 except Exception:
414 self.ui.log("remotefilefetchlog",
414 self.ui.log("remotefilefetchlog",
415 "Fail\n",
415 "Fail\n",
416 fetched_files = progress.pos - fromcache,
416 fetched_files = progress.pos - fromcache,
417 total_to_fetch = total - fromcache)
417 total_to_fetch = total - fromcache)
418 raise
418 raise
419 finally:
419 finally:
420 self.ui.verbose = verbose
420 self.ui.verbose = verbose
421 # send to memcache
421 # send to memcache
422 request = "set\n%d\n%s\n" % (len(missed), "\n".join(missed))
422 request = "set\n%d\n%s\n" % (len(missed), "\n".join(missed))
423 cache.request(request)
423 cache.request(request)
424
424
425 progress.complete()
425 progress.complete()
426
426
427 # mark ourselves as a user of this cache
427 # mark ourselves as a user of this cache
428 writedata.markrepo(self.repo.path)
428 writedata.markrepo(self.repo.path)
429 finally:
429 finally:
430 os.umask(oldumask)
430 os.umask(oldumask)
431
431
432 def receivemissing(self, pipe, filename, node):
432 def receivemissing(self, pipe, filename, node):
433 line = pipe.readline()[:-1]
433 line = pipe.readline()[:-1]
434 if not line:
434 if not line:
435 raise error.ResponseError(_("error downloading file contents:"),
435 raise error.ResponseError(_("error downloading file contents:"),
436 _("connection closed early"))
436 _("connection closed early"))
437 size = int(line)
437 size = int(line)
438 data = pipe.read(size)
438 data = pipe.read(size)
439 if len(data) != size:
439 if len(data) != size:
440 raise error.ResponseError(_("error downloading file contents:"),
440 raise error.ResponseError(_("error downloading file contents:"),
441 _("only received %s of %s bytes")
441 _("only received %s of %s bytes")
442 % (len(data), size))
442 % (len(data), size))
443
443
444 self.writedata.addremotefilelognode(filename, bin(node),
444 self.writedata.addremotefilelognode(filename, bin(node),
445 zlib.decompress(data))
445 zlib.decompress(data))
446
446
447 def connect(self):
447 def connect(self):
448 if self.cacheprocess:
448 if self.cacheprocess:
449 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
449 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
450 self.remotecache.connect(cmd)
450 self.remotecache.connect(cmd)
451 else:
451 else:
452 # If no cache process is specified, we fake one that always
452 # If no cache process is specified, we fake one that always
453 # returns cache misses. This enables tests to run easily
453 # returns cache misses. This enables tests to run easily
454 # and may eventually allow us to be a drop in replacement
454 # and may eventually allow us to be a drop in replacement
455 # for the largefiles extension.
455 # for the largefiles extension.
456 class simplecache(object):
456 class simplecache(object):
457 def __init__(self):
457 def __init__(self):
458 self.missingids = []
458 self.missingids = []
459 self.connected = True
459 self.connected = True
460
460
461 def close(self):
461 def close(self):
462 pass
462 pass
463
463
464 def request(self, value, flush=True):
464 def request(self, value, flush=True):
465 lines = value.split("\n")
465 lines = value.split("\n")
466 if lines[0] != "get":
466 if lines[0] != "get":
467 return
467 return
468 self.missingids = lines[2:-1]
468 self.missingids = lines[2:-1]
469 self.missingids.append('0')
469 self.missingids.append('0')
470
470
471 def receiveline(self):
471 def receiveline(self):
472 if len(self.missingids) > 0:
472 if len(self.missingids) > 0:
473 return self.missingids.pop(0)
473 return self.missingids.pop(0)
474 return None
474 return None
475
475
476 self.remotecache = simplecache()
476 self.remotecache = simplecache()
477
477
478 def close(self):
478 def close(self):
479 if fetches:
479 if fetches:
480 msg = ("%d files fetched over %d fetches - " +
480 msg = ("%d files fetched over %d fetches - " +
481 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
481 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
482 fetched,
482 fetched,
483 fetches,
483 fetches,
484 fetchmisses,
484 fetchmisses,
485 float(fetched - fetchmisses) / float(fetched) * 100.0,
485 float(fetched - fetchmisses) / float(fetched) * 100.0,
486 fetchcost)
486 fetchcost)
487 if self.debugoutput:
487 if self.debugoutput:
488 self.ui.warn(msg)
488 self.ui.warn(msg)
489 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
489 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
490 remotefilelogfetched=fetched,
490 remotefilelogfetched=fetched,
491 remotefilelogfetches=fetches,
491 remotefilelogfetches=fetches,
492 remotefilelogfetchmisses=fetchmisses,
492 remotefilelogfetchmisses=fetchmisses,
493 remotefilelogfetchtime=fetchcost * 1000)
493 remotefilelogfetchtime=fetchcost * 1000)
494
494
495 if self.remotecache.connected:
495 if self.remotecache.connected:
496 self.remotecache.close()
496 self.remotecache.close()
497
497
498 def prefetch(self, fileids, force=False, fetchdata=True,
498 def prefetch(self, fileids, force=False, fetchdata=True,
499 fetchhistory=False):
499 fetchhistory=False):
500 """downloads the given file versions to the cache
500 """downloads the given file versions to the cache
501 """
501 """
502 repo = self.repo
502 repo = self.repo
503 idstocheck = []
503 idstocheck = []
504 for file, id in fileids:
504 for file, id in fileids:
505 # hack
505 # hack
506 # - we don't use .hgtags
506 # - we don't use .hgtags
507 # - workingctx produces ids with length 42,
507 # - workingctx produces ids with length 42,
508 # which we skip since they aren't in any cache
508 # which we skip since they aren't in any cache
509 if (file == '.hgtags' or len(id) == 42
509 if (file == '.hgtags' or len(id) == 42
510 or not repo.shallowmatch(file)):
510 or not repo.shallowmatch(file)):
511 continue
511 continue
512
512
513 idstocheck.append((file, bin(id)))
513 idstocheck.append((file, bin(id)))
514
514
515 datastore = self.datastore
515 datastore = self.datastore
516 historystore = self.historystore
516 historystore = self.historystore
517 if force:
517 if force:
518 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
518 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
519 historystore = metadatastore.unionmetadatastore(
519 historystore = metadatastore.unionmetadatastore(
520 *repo.sharedhistorystores)
520 *repo.sharedhistorystores)
521
521
522 missingids = set()
522 missingids = set()
523 if fetchdata:
523 if fetchdata:
524 missingids.update(datastore.getmissing(idstocheck))
524 missingids.update(datastore.getmissing(idstocheck))
525 if fetchhistory:
525 if fetchhistory:
526 missingids.update(historystore.getmissing(idstocheck))
526 missingids.update(historystore.getmissing(idstocheck))
527
527
528 # partition missing nodes into nullid and not-nullid so we can
528 # partition missing nodes into nullid and not-nullid so we can
529 # warn about this filtering potentially shadowing bugs.
529 # warn about this filtering potentially shadowing bugs.
530 nullids = len([None for unused, id in missingids if id == nullid])
530 nullids = len([None for unused, id in missingids if id == nullid])
531 if nullids:
531 if nullids:
532 missingids = [(f, id) for f, id in missingids if id != nullid]
532 missingids = [(f, id) for f, id in missingids if id != nullid]
533 repo.ui.develwarn(
533 repo.ui.develwarn(
534 ('remotefilelog not fetching %d null revs'
534 ('remotefilelog not fetching %d null revs'
535 ' - this is likely hiding bugs' % nullids),
535 ' - this is likely hiding bugs' % nullids),
536 config='remotefilelog-ext')
536 config='remotefilelog-ext')
537 if missingids:
537 if missingids:
538 global fetches, fetched, fetchcost
538 global fetches, fetched, fetchcost
539 fetches += 1
539 fetches += 1
540
540
541 # We want to be able to detect excess individual file downloads, so
541 # We want to be able to detect excess individual file downloads, so
542 # let's log that information for debugging.
542 # let's log that information for debugging.
543 if fetches >= 15 and fetches < 18:
543 if fetches >= 15 and fetches < 18:
544 if fetches == 15:
544 if fetches == 15:
545 fetchwarning = self.ui.config('remotefilelog',
545 fetchwarning = self.ui.config('remotefilelog',
546 'fetchwarning')
546 'fetchwarning')
547 if fetchwarning:
547 if fetchwarning:
548 self.ui.warn(fetchwarning + '\n')
548 self.ui.warn(fetchwarning + '\n')
549 self.logstacktrace()
549 self.logstacktrace()
550 missingids = [(file, hex(id)) for file, id in sorted(missingids)]
550 missingids = [(file, hex(id)) for file, id in sorted(missingids)]
551 fetched += len(missingids)
551 fetched += len(missingids)
552 start = time.time()
552 start = time.time()
553 missingids = self.request(missingids)
553 missingids = self.request(missingids)
554 if missingids:
554 if missingids:
555 raise error.Abort(_("unable to download %d files") %
555 raise error.Abort(_("unable to download %d files") %
556 len(missingids))
556 len(missingids))
557 fetchcost += time.time() - start
557 fetchcost += time.time() - start
558 self._lfsprefetch(fileids)
558 self._lfsprefetch(fileids)
559
559
560 def _lfsprefetch(self, fileids):
560 def _lfsprefetch(self, fileids):
561 if not _lfsmod or not util.safehasattr(
561 if not _lfsmod or not util.safehasattr(
562 self.repo.svfs, 'lfslocalblobstore'):
562 self.repo.svfs, 'lfslocalblobstore'):
563 return
563 return
564 if not _lfsmod.wrapper.candownload(self.repo):
564 if not _lfsmod.wrapper.candownload(self.repo):
565 return
565 return
566 pointers = []
566 pointers = []
567 store = self.repo.svfs.lfslocalblobstore
567 store = self.repo.svfs.lfslocalblobstore
568 for file, id in fileids:
568 for file, id in fileids:
569 node = bin(id)
569 node = bin(id)
570 rlog = self.repo.file(file)
570 rlog = self.repo.file(file)
571 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
571 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
572 text = rlog.revision(node, raw=True)
572 text = rlog.rawdata(node)
573 p = _lfsmod.pointer.deserialize(text)
573 p = _lfsmod.pointer.deserialize(text)
574 oid = p.oid()
574 oid = p.oid()
575 if not store.has(oid):
575 if not store.has(oid):
576 pointers.append(p)
576 pointers.append(p)
577 if len(pointers) > 0:
577 if len(pointers) > 0:
578 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
578 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
579 assert all(store.has(p.oid()) for p in pointers)
579 assert all(store.has(p.oid()) for p in pointers)
580
580
581 def logstacktrace(self):
581 def logstacktrace(self):
582 import traceback
582 import traceback
583 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
583 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
584 ''.join(traceback.format_stack()))
584 ''.join(traceback.format_stack()))
@@ -1,462 +1,462 b''
1 # remotefilelog.py - filelog implementation where filelog history is stored
1 # remotefilelog.py - filelog implementation where filelog history is stored
2 # remotely
2 # remotely
3 #
3 #
4 # Copyright 2013 Facebook, Inc.
4 # Copyright 2013 Facebook, Inc.
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import collections
10 import collections
11 import os
11 import os
12
12
13 from mercurial.node import (
13 from mercurial.node import (
14 bin,
14 bin,
15 nullid,
15 nullid,
16 wdirfilenodeids,
16 wdirfilenodeids,
17 wdirid,
17 wdirid,
18 )
18 )
19 from mercurial.i18n import _
19 from mercurial.i18n import _
20 from mercurial import (
20 from mercurial import (
21 ancestor,
21 ancestor,
22 error,
22 error,
23 mdiff,
23 mdiff,
24 revlog,
24 revlog,
25 )
25 )
26 from mercurial.utils import storageutil
26 from mercurial.utils import storageutil
27
27
28 from . import (
28 from . import (
29 constants,
29 constants,
30 fileserverclient,
30 fileserverclient,
31 shallowutil,
31 shallowutil,
32 )
32 )
33
33
34 class remotefilelognodemap(object):
34 class remotefilelognodemap(object):
35 def __init__(self, filename, store):
35 def __init__(self, filename, store):
36 self._filename = filename
36 self._filename = filename
37 self._store = store
37 self._store = store
38
38
39 def __contains__(self, node):
39 def __contains__(self, node):
40 missing = self._store.getmissing([(self._filename, node)])
40 missing = self._store.getmissing([(self._filename, node)])
41 return not bool(missing)
41 return not bool(missing)
42
42
43 def __get__(self, node):
43 def __get__(self, node):
44 if node not in self:
44 if node not in self:
45 raise KeyError(node)
45 raise KeyError(node)
46 return node
46 return node
47
47
48 class remotefilelog(object):
48 class remotefilelog(object):
49
49
50 _generaldelta = True
50 _generaldelta = True
51
51
52 def __init__(self, opener, path, repo):
52 def __init__(self, opener, path, repo):
53 self.opener = opener
53 self.opener = opener
54 self.filename = path
54 self.filename = path
55 self.repo = repo
55 self.repo = repo
56 self.nodemap = remotefilelognodemap(self.filename, repo.contentstore)
56 self.nodemap = remotefilelognodemap(self.filename, repo.contentstore)
57
57
58 self.version = 1
58 self.version = 1
59
59
60 def read(self, node):
60 def read(self, node):
61 """returns the file contents at this node"""
61 """returns the file contents at this node"""
62 t = self.revision(node)
62 t = self.revision(node)
63 if not t.startswith('\1\n'):
63 if not t.startswith('\1\n'):
64 return t
64 return t
65 s = t.index('\1\n', 2)
65 s = t.index('\1\n', 2)
66 return t[s + 2:]
66 return t[s + 2:]
67
67
68 def add(self, text, meta, transaction, linknode, p1=None, p2=None):
68 def add(self, text, meta, transaction, linknode, p1=None, p2=None):
69 # hash with the metadata, like in vanilla filelogs
69 # hash with the metadata, like in vanilla filelogs
70 hashtext = shallowutil.createrevlogtext(text, meta.get('copy'),
70 hashtext = shallowutil.createrevlogtext(text, meta.get('copy'),
71 meta.get('copyrev'))
71 meta.get('copyrev'))
72 node = storageutil.hashrevisionsha1(hashtext, p1, p2)
72 node = storageutil.hashrevisionsha1(hashtext, p1, p2)
73 return self.addrevision(hashtext, transaction, linknode, p1, p2,
73 return self.addrevision(hashtext, transaction, linknode, p1, p2,
74 node=node)
74 node=node)
75
75
76 def _createfileblob(self, text, meta, flags, p1, p2, node, linknode):
76 def _createfileblob(self, text, meta, flags, p1, p2, node, linknode):
77 # text passed to "_createfileblob" does not include filelog metadata
77 # text passed to "_createfileblob" does not include filelog metadata
78 header = shallowutil.buildfileblobheader(len(text), flags)
78 header = shallowutil.buildfileblobheader(len(text), flags)
79 data = "%s\0%s" % (header, text)
79 data = "%s\0%s" % (header, text)
80
80
81 realp1 = p1
81 realp1 = p1
82 copyfrom = ""
82 copyfrom = ""
83 if meta and 'copy' in meta:
83 if meta and 'copy' in meta:
84 copyfrom = meta['copy']
84 copyfrom = meta['copy']
85 realp1 = bin(meta['copyrev'])
85 realp1 = bin(meta['copyrev'])
86
86
87 data += "%s%s%s%s%s\0" % (node, realp1, p2, linknode, copyfrom)
87 data += "%s%s%s%s%s\0" % (node, realp1, p2, linknode, copyfrom)
88
88
89 visited = set()
89 visited = set()
90
90
91 pancestors = {}
91 pancestors = {}
92 queue = []
92 queue = []
93 if realp1 != nullid:
93 if realp1 != nullid:
94 p1flog = self
94 p1flog = self
95 if copyfrom:
95 if copyfrom:
96 p1flog = remotefilelog(self.opener, copyfrom, self.repo)
96 p1flog = remotefilelog(self.opener, copyfrom, self.repo)
97
97
98 pancestors.update(p1flog.ancestormap(realp1))
98 pancestors.update(p1flog.ancestormap(realp1))
99 queue.append(realp1)
99 queue.append(realp1)
100 visited.add(realp1)
100 visited.add(realp1)
101 if p2 != nullid:
101 if p2 != nullid:
102 pancestors.update(self.ancestormap(p2))
102 pancestors.update(self.ancestormap(p2))
103 queue.append(p2)
103 queue.append(p2)
104 visited.add(p2)
104 visited.add(p2)
105
105
106 ancestortext = ""
106 ancestortext = ""
107
107
108 # add the ancestors in topological order
108 # add the ancestors in topological order
109 while queue:
109 while queue:
110 c = queue.pop(0)
110 c = queue.pop(0)
111 pa1, pa2, ancestorlinknode, pacopyfrom = pancestors[c]
111 pa1, pa2, ancestorlinknode, pacopyfrom = pancestors[c]
112
112
113 pacopyfrom = pacopyfrom or ''
113 pacopyfrom = pacopyfrom or ''
114 ancestortext += "%s%s%s%s%s\0" % (
114 ancestortext += "%s%s%s%s%s\0" % (
115 c, pa1, pa2, ancestorlinknode, pacopyfrom)
115 c, pa1, pa2, ancestorlinknode, pacopyfrom)
116
116
117 if pa1 != nullid and pa1 not in visited:
117 if pa1 != nullid and pa1 not in visited:
118 queue.append(pa1)
118 queue.append(pa1)
119 visited.add(pa1)
119 visited.add(pa1)
120 if pa2 != nullid and pa2 not in visited:
120 if pa2 != nullid and pa2 not in visited:
121 queue.append(pa2)
121 queue.append(pa2)
122 visited.add(pa2)
122 visited.add(pa2)
123
123
124 data += ancestortext
124 data += ancestortext
125
125
126 return data
126 return data
127
127
128 def addrevision(self, text, transaction, linknode, p1, p2, cachedelta=None,
128 def addrevision(self, text, transaction, linknode, p1, p2, cachedelta=None,
129 node=None, flags=revlog.REVIDX_DEFAULT_FLAGS):
129 node=None, flags=revlog.REVIDX_DEFAULT_FLAGS):
130 # text passed to "addrevision" includes hg filelog metadata header
130 # text passed to "addrevision" includes hg filelog metadata header
131 if node is None:
131 if node is None:
132 node = storageutil.hashrevisionsha1(text, p1, p2)
132 node = storageutil.hashrevisionsha1(text, p1, p2)
133
133
134 meta, metaoffset = storageutil.parsemeta(text)
134 meta, metaoffset = storageutil.parsemeta(text)
135 rawtext, validatehash = self._processflags(text, flags, 'write')
135 rawtext, validatehash = self._processflags(text, flags, 'write')
136 return self.addrawrevision(rawtext, transaction, linknode, p1, p2,
136 return self.addrawrevision(rawtext, transaction, linknode, p1, p2,
137 node, flags, cachedelta,
137 node, flags, cachedelta,
138 _metatuple=(meta, metaoffset))
138 _metatuple=(meta, metaoffset))
139
139
140 def addrawrevision(self, rawtext, transaction, linknode, p1, p2, node,
140 def addrawrevision(self, rawtext, transaction, linknode, p1, p2, node,
141 flags, cachedelta=None, _metatuple=None):
141 flags, cachedelta=None, _metatuple=None):
142 if _metatuple:
142 if _metatuple:
143 # _metatuple: used by "addrevision" internally by remotefilelog
143 # _metatuple: used by "addrevision" internally by remotefilelog
144 # meta was parsed confidently
144 # meta was parsed confidently
145 meta, metaoffset = _metatuple
145 meta, metaoffset = _metatuple
146 else:
146 else:
147 # not from self.addrevision, but something else (repo._filecommit)
147 # not from self.addrevision, but something else (repo._filecommit)
148 # calls addrawrevision directly. remotefilelog needs to get and
148 # calls addrawrevision directly. remotefilelog needs to get and
149 # strip filelog metadata.
149 # strip filelog metadata.
150 # we don't have confidence about whether rawtext contains filelog
150 # we don't have confidence about whether rawtext contains filelog
151 # metadata or not (flag processor could replace it), so we just
151 # metadata or not (flag processor could replace it), so we just
152 # parse it as best-effort.
152 # parse it as best-effort.
153 # in LFS (flags != 0)'s case, the best way is to call LFS code to
153 # in LFS (flags != 0)'s case, the best way is to call LFS code to
154 # get the meta information, instead of storageutil.parsemeta.
154 # get the meta information, instead of storageutil.parsemeta.
155 meta, metaoffset = storageutil.parsemeta(rawtext)
155 meta, metaoffset = storageutil.parsemeta(rawtext)
156 if flags != 0:
156 if flags != 0:
157 # when flags != 0, be conservative and do not mangle rawtext, since
157 # when flags != 0, be conservative and do not mangle rawtext, since
158 # a read flag processor expects the text not being mangled at all.
158 # a read flag processor expects the text not being mangled at all.
159 metaoffset = 0
159 metaoffset = 0
160 if metaoffset:
160 if metaoffset:
161 # remotefilelog fileblob stores copy metadata in its ancestortext,
161 # remotefilelog fileblob stores copy metadata in its ancestortext,
162 # not its main blob. so we need to remove filelog metadata
162 # not its main blob. so we need to remove filelog metadata
163 # (containing copy information) from text.
163 # (containing copy information) from text.
164 blobtext = rawtext[metaoffset:]
164 blobtext = rawtext[metaoffset:]
165 else:
165 else:
166 blobtext = rawtext
166 blobtext = rawtext
167 data = self._createfileblob(blobtext, meta, flags, p1, p2, node,
167 data = self._createfileblob(blobtext, meta, flags, p1, p2, node,
168 linknode)
168 linknode)
169 self.repo.contentstore.addremotefilelognode(self.filename, node, data)
169 self.repo.contentstore.addremotefilelognode(self.filename, node, data)
170
170
171 return node
171 return node
172
172
173 def renamed(self, node):
173 def renamed(self, node):
174 ancestors = self.repo.metadatastore.getancestors(self.filename, node)
174 ancestors = self.repo.metadatastore.getancestors(self.filename, node)
175 p1, p2, linknode, copyfrom = ancestors[node]
175 p1, p2, linknode, copyfrom = ancestors[node]
176 if copyfrom:
176 if copyfrom:
177 return (copyfrom, p1)
177 return (copyfrom, p1)
178
178
179 return False
179 return False
180
180
181 def size(self, node):
181 def size(self, node):
182 """return the size of a given revision"""
182 """return the size of a given revision"""
183 return len(self.read(node))
183 return len(self.read(node))
184
184
185 rawsize = size
185 rawsize = size
186
186
187 def cmp(self, node, text):
187 def cmp(self, node, text):
188 """compare text with a given file revision
188 """compare text with a given file revision
189
189
190 returns True if text is different than what is stored.
190 returns True if text is different than what is stored.
191 """
191 """
192
192
193 if node == nullid:
193 if node == nullid:
194 return True
194 return True
195
195
196 nodetext = self.read(node)
196 nodetext = self.read(node)
197 return nodetext != text
197 return nodetext != text
198
198
199 def __nonzero__(self):
199 def __nonzero__(self):
200 return True
200 return True
201
201
202 __bool__ = __nonzero__
202 __bool__ = __nonzero__
203
203
204 def __len__(self):
204 def __len__(self):
205 if self.filename == '.hgtags':
205 if self.filename == '.hgtags':
206 # The length of .hgtags is used to fast path tag checking.
206 # The length of .hgtags is used to fast path tag checking.
207 # remotefilelog doesn't support .hgtags since the entire .hgtags
207 # remotefilelog doesn't support .hgtags since the entire .hgtags
208 # history is needed. Use the excludepattern setting to make
208 # history is needed. Use the excludepattern setting to make
209 # .hgtags a normal filelog.
209 # .hgtags a normal filelog.
210 return 0
210 return 0
211
211
212 raise RuntimeError("len not supported")
212 raise RuntimeError("len not supported")
213
213
214 def empty(self):
214 def empty(self):
215 return False
215 return False
216
216
217 def flags(self, node):
217 def flags(self, node):
218 if isinstance(node, int):
218 if isinstance(node, int):
219 raise error.ProgrammingError(
219 raise error.ProgrammingError(
220 'remotefilelog does not accept integer rev for flags')
220 'remotefilelog does not accept integer rev for flags')
221 store = self.repo.contentstore
221 store = self.repo.contentstore
222 return store.getmeta(self.filename, node).get(constants.METAKEYFLAG, 0)
222 return store.getmeta(self.filename, node).get(constants.METAKEYFLAG, 0)
223
223
224 def parents(self, node):
224 def parents(self, node):
225 if node == nullid:
225 if node == nullid:
226 return nullid, nullid
226 return nullid, nullid
227
227
228 ancestormap = self.repo.metadatastore.getancestors(self.filename, node)
228 ancestormap = self.repo.metadatastore.getancestors(self.filename, node)
229 p1, p2, linknode, copyfrom = ancestormap[node]
229 p1, p2, linknode, copyfrom = ancestormap[node]
230 if copyfrom:
230 if copyfrom:
231 p1 = nullid
231 p1 = nullid
232
232
233 return p1, p2
233 return p1, p2
234
234
235 def parentrevs(self, rev):
235 def parentrevs(self, rev):
236 # TODO(augie): this is a node and should be a rev, but for now
236 # TODO(augie): this is a node and should be a rev, but for now
237 # nothing in core seems to actually break.
237 # nothing in core seems to actually break.
238 return self.parents(rev)
238 return self.parents(rev)
239
239
240 def linknode(self, node):
240 def linknode(self, node):
241 ancestormap = self.repo.metadatastore.getancestors(self.filename, node)
241 ancestormap = self.repo.metadatastore.getancestors(self.filename, node)
242 p1, p2, linknode, copyfrom = ancestormap[node]
242 p1, p2, linknode, copyfrom = ancestormap[node]
243 return linknode
243 return linknode
244
244
245 def linkrev(self, node):
245 def linkrev(self, node):
246 return self.repo.unfiltered().changelog.rev(self.linknode(node))
246 return self.repo.unfiltered().changelog.rev(self.linknode(node))
247
247
248 def emitrevisions(self, nodes, nodesorder=None, revisiondata=False,
248 def emitrevisions(self, nodes, nodesorder=None, revisiondata=False,
249 assumehaveparentrevisions=False, deltaprevious=False,
249 assumehaveparentrevisions=False, deltaprevious=False,
250 deltamode=None):
250 deltamode=None):
251 # we don't use any of these parameters here
251 # we don't use any of these parameters here
252 del nodesorder, revisiondata, assumehaveparentrevisions, deltaprevious
252 del nodesorder, revisiondata, assumehaveparentrevisions, deltaprevious
253 del deltamode
253 del deltamode
254 prevnode = None
254 prevnode = None
255 for node in nodes:
255 for node in nodes:
256 p1, p2 = self.parents(node)
256 p1, p2 = self.parents(node)
257 if prevnode is None:
257 if prevnode is None:
258 basenode = prevnode = p1
258 basenode = prevnode = p1
259 if basenode == node:
259 if basenode == node:
260 basenode = nullid
260 basenode = nullid
261 if basenode != nullid:
261 if basenode != nullid:
262 revision = None
262 revision = None
263 delta = self.revdiff(basenode, node)
263 delta = self.revdiff(basenode, node)
264 else:
264 else:
265 revision = self.revision(node, raw=True)
265 revision = self.rawdata(node)
266 delta = None
266 delta = None
267 yield revlog.revlogrevisiondelta(
267 yield revlog.revlogrevisiondelta(
268 node=node,
268 node=node,
269 p1node=p1,
269 p1node=p1,
270 p2node=p2,
270 p2node=p2,
271 linknode=self.linknode(node),
271 linknode=self.linknode(node),
272 basenode=basenode,
272 basenode=basenode,
273 flags=self.flags(node),
273 flags=self.flags(node),
274 baserevisionsize=None,
274 baserevisionsize=None,
275 revision=revision,
275 revision=revision,
276 delta=delta,
276 delta=delta,
277 )
277 )
278
278
279 def revdiff(self, node1, node2):
279 def revdiff(self, node1, node2):
280 return mdiff.textdiff(self.revision(node1, raw=True),
280 return mdiff.textdiff(self.rawdata(node1),
281 self.revision(node2, raw=True))
281 self.rawdata(node2))
282
282
283 def lookup(self, node):
283 def lookup(self, node):
284 if len(node) == 40:
284 if len(node) == 40:
285 node = bin(node)
285 node = bin(node)
286 if len(node) != 20:
286 if len(node) != 20:
287 raise error.LookupError(node, self.filename,
287 raise error.LookupError(node, self.filename,
288 _('invalid lookup input'))
288 _('invalid lookup input'))
289
289
290 return node
290 return node
291
291
292 def rev(self, node):
292 def rev(self, node):
293 # This is a hack to make TortoiseHG work.
293 # This is a hack to make TortoiseHG work.
294 return node
294 return node
295
295
296 def node(self, rev):
296 def node(self, rev):
297 # This is a hack.
297 # This is a hack.
298 if isinstance(rev, int):
298 if isinstance(rev, int):
299 raise error.ProgrammingError(
299 raise error.ProgrammingError(
300 'remotefilelog does not convert integer rev to node')
300 'remotefilelog does not convert integer rev to node')
301 return rev
301 return rev
302
302
303 def revision(self, node, raw=False):
303 def revision(self, node, raw=False):
304 """returns the revlog contents at this node.
304 """returns the revlog contents at this node.
305 this includes the meta data traditionally included in file revlogs.
305 this includes the meta data traditionally included in file revlogs.
306 this is generally only used for bundling and communicating with vanilla
306 this is generally only used for bundling and communicating with vanilla
307 hg clients.
307 hg clients.
308 """
308 """
309 if node == nullid:
309 if node == nullid:
310 return ""
310 return ""
311 if len(node) != 20:
311 if len(node) != 20:
312 raise error.LookupError(node, self.filename,
312 raise error.LookupError(node, self.filename,
313 _('invalid revision input'))
313 _('invalid revision input'))
314 if node == wdirid or node in wdirfilenodeids:
314 if node == wdirid or node in wdirfilenodeids:
315 raise error.WdirUnsupported
315 raise error.WdirUnsupported
316
316
317 store = self.repo.contentstore
317 store = self.repo.contentstore
318 rawtext = store.get(self.filename, node)
318 rawtext = store.get(self.filename, node)
319 if raw:
319 if raw:
320 return rawtext
320 return rawtext
321 flags = store.getmeta(self.filename, node).get(constants.METAKEYFLAG, 0)
321 flags = store.getmeta(self.filename, node).get(constants.METAKEYFLAG, 0)
322 if flags == 0:
322 if flags == 0:
323 return rawtext
323 return rawtext
324 text, verifyhash = self._processflags(rawtext, flags, 'read')
324 text, verifyhash = self._processflags(rawtext, flags, 'read')
325 return text
325 return text
326
326
327 def rawdata(self, node):
327 def rawdata(self, node):
328 return self.revision(node, raw=False)
328 return self.revision(node, raw=False)
329
329
330 def _processflags(self, text, flags, operation, raw=False):
330 def _processflags(self, text, flags, operation, raw=False):
331 # mostly copied from hg/mercurial/revlog.py
331 # mostly copied from hg/mercurial/revlog.py
332 validatehash = True
332 validatehash = True
333 orderedflags = revlog.REVIDX_FLAGS_ORDER
333 orderedflags = revlog.REVIDX_FLAGS_ORDER
334 if operation == 'write':
334 if operation == 'write':
335 orderedflags = reversed(orderedflags)
335 orderedflags = reversed(orderedflags)
336 for flag in orderedflags:
336 for flag in orderedflags:
337 if flag & flags:
337 if flag & flags:
338 vhash = True
338 vhash = True
339 if flag not in revlog._flagprocessors:
339 if flag not in revlog._flagprocessors:
340 message = _("missing processor for flag '%#x'") % (flag)
340 message = _("missing processor for flag '%#x'") % (flag)
341 raise revlog.RevlogError(message)
341 raise revlog.RevlogError(message)
342 readfunc, writefunc, rawfunc = revlog._flagprocessors[flag]
342 readfunc, writefunc, rawfunc = revlog._flagprocessors[flag]
343 if raw:
343 if raw:
344 vhash = rawfunc(self, text)
344 vhash = rawfunc(self, text)
345 elif operation == 'read':
345 elif operation == 'read':
346 text, vhash = readfunc(self, text)
346 text, vhash = readfunc(self, text)
347 elif operation == 'write':
347 elif operation == 'write':
348 text, vhash = writefunc(self, text)
348 text, vhash = writefunc(self, text)
349 validatehash = validatehash and vhash
349 validatehash = validatehash and vhash
350 return text, validatehash
350 return text, validatehash
351
351
352 def _read(self, id):
352 def _read(self, id):
353 """reads the raw file blob from disk, cache, or server"""
353 """reads the raw file blob from disk, cache, or server"""
354 fileservice = self.repo.fileservice
354 fileservice = self.repo.fileservice
355 localcache = fileservice.localcache
355 localcache = fileservice.localcache
356 cachekey = fileserverclient.getcachekey(self.repo.name, self.filename,
356 cachekey = fileserverclient.getcachekey(self.repo.name, self.filename,
357 id)
357 id)
358 try:
358 try:
359 return localcache.read(cachekey)
359 return localcache.read(cachekey)
360 except KeyError:
360 except KeyError:
361 pass
361 pass
362
362
363 localkey = fileserverclient.getlocalkey(self.filename, id)
363 localkey = fileserverclient.getlocalkey(self.filename, id)
364 localpath = os.path.join(self.localpath, localkey)
364 localpath = os.path.join(self.localpath, localkey)
365 try:
365 try:
366 return shallowutil.readfile(localpath)
366 return shallowutil.readfile(localpath)
367 except IOError:
367 except IOError:
368 pass
368 pass
369
369
370 fileservice.prefetch([(self.filename, id)])
370 fileservice.prefetch([(self.filename, id)])
371 try:
371 try:
372 return localcache.read(cachekey)
372 return localcache.read(cachekey)
373 except KeyError:
373 except KeyError:
374 pass
374 pass
375
375
376 raise error.LookupError(id, self.filename, _('no node'))
376 raise error.LookupError(id, self.filename, _('no node'))
377
377
378 def ancestormap(self, node):
378 def ancestormap(self, node):
379 return self.repo.metadatastore.getancestors(self.filename, node)
379 return self.repo.metadatastore.getancestors(self.filename, node)
380
380
381 def ancestor(self, a, b):
381 def ancestor(self, a, b):
382 if a == nullid or b == nullid:
382 if a == nullid or b == nullid:
383 return nullid
383 return nullid
384
384
385 revmap, parentfunc = self._buildrevgraph(a, b)
385 revmap, parentfunc = self._buildrevgraph(a, b)
386 nodemap = dict(((v, k) for (k, v) in revmap.iteritems()))
386 nodemap = dict(((v, k) for (k, v) in revmap.iteritems()))
387
387
388 ancs = ancestor.ancestors(parentfunc, revmap[a], revmap[b])
388 ancs = ancestor.ancestors(parentfunc, revmap[a], revmap[b])
389 if ancs:
389 if ancs:
390 # choose a consistent winner when there's a tie
390 # choose a consistent winner when there's a tie
391 return min(map(nodemap.__getitem__, ancs))
391 return min(map(nodemap.__getitem__, ancs))
392 return nullid
392 return nullid
393
393
394 def commonancestorsheads(self, a, b):
394 def commonancestorsheads(self, a, b):
395 """calculate all the heads of the common ancestors of nodes a and b"""
395 """calculate all the heads of the common ancestors of nodes a and b"""
396
396
397 if a == nullid or b == nullid:
397 if a == nullid or b == nullid:
398 return nullid
398 return nullid
399
399
400 revmap, parentfunc = self._buildrevgraph(a, b)
400 revmap, parentfunc = self._buildrevgraph(a, b)
401 nodemap = dict(((v, k) for (k, v) in revmap.iteritems()))
401 nodemap = dict(((v, k) for (k, v) in revmap.iteritems()))
402
402
403 ancs = ancestor.commonancestorsheads(parentfunc, revmap[a], revmap[b])
403 ancs = ancestor.commonancestorsheads(parentfunc, revmap[a], revmap[b])
404 return map(nodemap.__getitem__, ancs)
404 return map(nodemap.__getitem__, ancs)
405
405
406 def _buildrevgraph(self, a, b):
406 def _buildrevgraph(self, a, b):
407 """Builds a numeric revision graph for the given two nodes.
407 """Builds a numeric revision graph for the given two nodes.
408 Returns a node->rev map and a rev->[revs] parent function.
408 Returns a node->rev map and a rev->[revs] parent function.
409 """
409 """
410 amap = self.ancestormap(a)
410 amap = self.ancestormap(a)
411 bmap = self.ancestormap(b)
411 bmap = self.ancestormap(b)
412
412
413 # Union the two maps
413 # Union the two maps
414 parentsmap = collections.defaultdict(list)
414 parentsmap = collections.defaultdict(list)
415 allparents = set()
415 allparents = set()
416 for mapping in (amap, bmap):
416 for mapping in (amap, bmap):
417 for node, pdata in mapping.iteritems():
417 for node, pdata in mapping.iteritems():
418 parents = parentsmap[node]
418 parents = parentsmap[node]
419 p1, p2, linknode, copyfrom = pdata
419 p1, p2, linknode, copyfrom = pdata
420 # Don't follow renames (copyfrom).
420 # Don't follow renames (copyfrom).
421 # remotefilectx.ancestor does that.
421 # remotefilectx.ancestor does that.
422 if p1 != nullid and not copyfrom:
422 if p1 != nullid and not copyfrom:
423 parents.append(p1)
423 parents.append(p1)
424 allparents.add(p1)
424 allparents.add(p1)
425 if p2 != nullid:
425 if p2 != nullid:
426 parents.append(p2)
426 parents.append(p2)
427 allparents.add(p2)
427 allparents.add(p2)
428
428
429 # Breadth first traversal to build linkrev graph
429 # Breadth first traversal to build linkrev graph
430 parentrevs = collections.defaultdict(list)
430 parentrevs = collections.defaultdict(list)
431 revmap = {}
431 revmap = {}
432 queue = collections.deque(((None, n) for n in parentsmap
432 queue = collections.deque(((None, n) for n in parentsmap
433 if n not in allparents))
433 if n not in allparents))
434 while queue:
434 while queue:
435 prevrev, current = queue.pop()
435 prevrev, current = queue.pop()
436 if current in revmap:
436 if current in revmap:
437 if prevrev:
437 if prevrev:
438 parentrevs[prevrev].append(revmap[current])
438 parentrevs[prevrev].append(revmap[current])
439 continue
439 continue
440
440
441 # Assign linkrevs in reverse order, so start at
441 # Assign linkrevs in reverse order, so start at
442 # len(parentsmap) and work backwards.
442 # len(parentsmap) and work backwards.
443 currentrev = len(parentsmap) - len(revmap) - 1
443 currentrev = len(parentsmap) - len(revmap) - 1
444 revmap[current] = currentrev
444 revmap[current] = currentrev
445
445
446 if prevrev:
446 if prevrev:
447 parentrevs[prevrev].append(currentrev)
447 parentrevs[prevrev].append(currentrev)
448
448
449 for parent in parentsmap.get(current):
449 for parent in parentsmap.get(current):
450 queue.appendleft((currentrev, parent))
450 queue.appendleft((currentrev, parent))
451
451
452 return revmap, parentrevs.__getitem__
452 return revmap, parentrevs.__getitem__
453
453
454 def strip(self, minlink, transaction):
454 def strip(self, minlink, transaction):
455 pass
455 pass
456
456
457 # misc unused things
457 # misc unused things
458 def files(self):
458 def files(self):
459 return []
459 return []
460
460
461 def checksize(self):
461 def checksize(self):
462 return 0, 0
462 return 0, 0
@@ -1,404 +1,404 b''
1 # remotefilelogserver.py - server logic for a remotefilelog server
1 # remotefilelogserver.py - server logic for a remotefilelog server
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import errno
9 import errno
10 import os
10 import os
11 import stat
11 import stat
12 import time
12 import time
13 import zlib
13 import zlib
14
14
15 from mercurial.i18n import _
15 from mercurial.i18n import _
16 from mercurial.node import bin, hex, nullid
16 from mercurial.node import bin, hex, nullid
17 from mercurial import (
17 from mercurial import (
18 changegroup,
18 changegroup,
19 changelog,
19 changelog,
20 context,
20 context,
21 error,
21 error,
22 extensions,
22 extensions,
23 match,
23 match,
24 store,
24 store,
25 streamclone,
25 streamclone,
26 util,
26 util,
27 wireprotoserver,
27 wireprotoserver,
28 wireprototypes,
28 wireprototypes,
29 wireprotov1server,
29 wireprotov1server,
30 )
30 )
31 from . import (
31 from . import (
32 constants,
32 constants,
33 shallowutil,
33 shallowutil,
34 )
34 )
35
35
36 _sshv1server = wireprotoserver.sshv1protocolhandler
36 _sshv1server = wireprotoserver.sshv1protocolhandler
37
37
38 def setupserver(ui, repo):
38 def setupserver(ui, repo):
39 """Sets up a normal Mercurial repo so it can serve files to shallow repos.
39 """Sets up a normal Mercurial repo so it can serve files to shallow repos.
40 """
40 """
41 onetimesetup(ui)
41 onetimesetup(ui)
42
42
43 # don't send files to shallow clients during pulls
43 # don't send files to shallow clients during pulls
44 def generatefiles(orig, self, changedfiles, linknodes, commonrevs, source,
44 def generatefiles(orig, self, changedfiles, linknodes, commonrevs, source,
45 *args, **kwargs):
45 *args, **kwargs):
46 caps = self._bundlecaps or []
46 caps = self._bundlecaps or []
47 if constants.BUNDLE2_CAPABLITY in caps:
47 if constants.BUNDLE2_CAPABLITY in caps:
48 # only send files that don't match the specified patterns
48 # only send files that don't match the specified patterns
49 includepattern = None
49 includepattern = None
50 excludepattern = None
50 excludepattern = None
51 for cap in (self._bundlecaps or []):
51 for cap in (self._bundlecaps or []):
52 if cap.startswith("includepattern="):
52 if cap.startswith("includepattern="):
53 includepattern = cap[len("includepattern="):].split('\0')
53 includepattern = cap[len("includepattern="):].split('\0')
54 elif cap.startswith("excludepattern="):
54 elif cap.startswith("excludepattern="):
55 excludepattern = cap[len("excludepattern="):].split('\0')
55 excludepattern = cap[len("excludepattern="):].split('\0')
56
56
57 m = match.always()
57 m = match.always()
58 if includepattern or excludepattern:
58 if includepattern or excludepattern:
59 m = match.match(repo.root, '', None,
59 m = match.match(repo.root, '', None,
60 includepattern, excludepattern)
60 includepattern, excludepattern)
61
61
62 changedfiles = list([f for f in changedfiles if not m(f)])
62 changedfiles = list([f for f in changedfiles if not m(f)])
63 return orig(self, changedfiles, linknodes, commonrevs, source,
63 return orig(self, changedfiles, linknodes, commonrevs, source,
64 *args, **kwargs)
64 *args, **kwargs)
65
65
66 extensions.wrapfunction(
66 extensions.wrapfunction(
67 changegroup.cgpacker, 'generatefiles', generatefiles)
67 changegroup.cgpacker, 'generatefiles', generatefiles)
68
68
69 onetime = False
69 onetime = False
70 def onetimesetup(ui):
70 def onetimesetup(ui):
71 """Configures the wireprotocol for both clients and servers.
71 """Configures the wireprotocol for both clients and servers.
72 """
72 """
73 global onetime
73 global onetime
74 if onetime:
74 if onetime:
75 return
75 return
76 onetime = True
76 onetime = True
77
77
78 # support file content requests
78 # support file content requests
79 wireprotov1server.wireprotocommand(
79 wireprotov1server.wireprotocommand(
80 'x_rfl_getflogheads', 'path', permission='pull')(getflogheads)
80 'x_rfl_getflogheads', 'path', permission='pull')(getflogheads)
81 wireprotov1server.wireprotocommand(
81 wireprotov1server.wireprotocommand(
82 'x_rfl_getfiles', '', permission='pull')(getfiles)
82 'x_rfl_getfiles', '', permission='pull')(getfiles)
83 wireprotov1server.wireprotocommand(
83 wireprotov1server.wireprotocommand(
84 'x_rfl_getfile', 'file node', permission='pull')(getfile)
84 'x_rfl_getfile', 'file node', permission='pull')(getfile)
85
85
86 class streamstate(object):
86 class streamstate(object):
87 match = None
87 match = None
88 shallowremote = False
88 shallowremote = False
89 noflatmf = False
89 noflatmf = False
90 state = streamstate()
90 state = streamstate()
91
91
92 def stream_out_shallow(repo, proto, other):
92 def stream_out_shallow(repo, proto, other):
93 includepattern = None
93 includepattern = None
94 excludepattern = None
94 excludepattern = None
95 raw = other.get('includepattern')
95 raw = other.get('includepattern')
96 if raw:
96 if raw:
97 includepattern = raw.split('\0')
97 includepattern = raw.split('\0')
98 raw = other.get('excludepattern')
98 raw = other.get('excludepattern')
99 if raw:
99 if raw:
100 excludepattern = raw.split('\0')
100 excludepattern = raw.split('\0')
101
101
102 oldshallow = state.shallowremote
102 oldshallow = state.shallowremote
103 oldmatch = state.match
103 oldmatch = state.match
104 oldnoflatmf = state.noflatmf
104 oldnoflatmf = state.noflatmf
105 try:
105 try:
106 state.shallowremote = True
106 state.shallowremote = True
107 state.match = match.always()
107 state.match = match.always()
108 state.noflatmf = other.get('noflatmanifest') == 'True'
108 state.noflatmf = other.get('noflatmanifest') == 'True'
109 if includepattern or excludepattern:
109 if includepattern or excludepattern:
110 state.match = match.match(repo.root, '', None,
110 state.match = match.match(repo.root, '', None,
111 includepattern, excludepattern)
111 includepattern, excludepattern)
112 streamres = wireprotov1server.stream(repo, proto)
112 streamres = wireprotov1server.stream(repo, proto)
113
113
114 # Force the first value to execute, so the file list is computed
114 # Force the first value to execute, so the file list is computed
115 # within the try/finally scope
115 # within the try/finally scope
116 first = next(streamres.gen)
116 first = next(streamres.gen)
117 second = next(streamres.gen)
117 second = next(streamres.gen)
118 def gen():
118 def gen():
119 yield first
119 yield first
120 yield second
120 yield second
121 for value in streamres.gen:
121 for value in streamres.gen:
122 yield value
122 yield value
123 return wireprototypes.streamres(gen())
123 return wireprototypes.streamres(gen())
124 finally:
124 finally:
125 state.shallowremote = oldshallow
125 state.shallowremote = oldshallow
126 state.match = oldmatch
126 state.match = oldmatch
127 state.noflatmf = oldnoflatmf
127 state.noflatmf = oldnoflatmf
128
128
129 wireprotov1server.commands['stream_out_shallow'] = (stream_out_shallow, '*')
129 wireprotov1server.commands['stream_out_shallow'] = (stream_out_shallow, '*')
130
130
131 # don't clone filelogs to shallow clients
131 # don't clone filelogs to shallow clients
132 def _walkstreamfiles(orig, repo, matcher=None):
132 def _walkstreamfiles(orig, repo, matcher=None):
133 if state.shallowremote:
133 if state.shallowremote:
134 # if we are shallow ourselves, stream our local commits
134 # if we are shallow ourselves, stream our local commits
135 if shallowutil.isenabled(repo):
135 if shallowutil.isenabled(repo):
136 striplen = len(repo.store.path) + 1
136 striplen = len(repo.store.path) + 1
137 readdir = repo.store.rawvfs.readdir
137 readdir = repo.store.rawvfs.readdir
138 visit = [os.path.join(repo.store.path, 'data')]
138 visit = [os.path.join(repo.store.path, 'data')]
139 while visit:
139 while visit:
140 p = visit.pop()
140 p = visit.pop()
141 for f, kind, st in readdir(p, stat=True):
141 for f, kind, st in readdir(p, stat=True):
142 fp = p + '/' + f
142 fp = p + '/' + f
143 if kind == stat.S_IFREG:
143 if kind == stat.S_IFREG:
144 if not fp.endswith('.i') and not fp.endswith('.d'):
144 if not fp.endswith('.i') and not fp.endswith('.d'):
145 n = util.pconvert(fp[striplen:])
145 n = util.pconvert(fp[striplen:])
146 yield (store.decodedir(n), n, st.st_size)
146 yield (store.decodedir(n), n, st.st_size)
147 if kind == stat.S_IFDIR:
147 if kind == stat.S_IFDIR:
148 visit.append(fp)
148 visit.append(fp)
149
149
150 if 'treemanifest' in repo.requirements:
150 if 'treemanifest' in repo.requirements:
151 for (u, e, s) in repo.store.datafiles():
151 for (u, e, s) in repo.store.datafiles():
152 if (u.startswith('meta/') and
152 if (u.startswith('meta/') and
153 (u.endswith('.i') or u.endswith('.d'))):
153 (u.endswith('.i') or u.endswith('.d'))):
154 yield (u, e, s)
154 yield (u, e, s)
155
155
156 # Return .d and .i files that do not match the shallow pattern
156 # Return .d and .i files that do not match the shallow pattern
157 match = state.match
157 match = state.match
158 if match and not match.always():
158 if match and not match.always():
159 for (u, e, s) in repo.store.datafiles():
159 for (u, e, s) in repo.store.datafiles():
160 f = u[5:-2] # trim data/... and .i/.d
160 f = u[5:-2] # trim data/... and .i/.d
161 if not state.match(f):
161 if not state.match(f):
162 yield (u, e, s)
162 yield (u, e, s)
163
163
164 for x in repo.store.topfiles():
164 for x in repo.store.topfiles():
165 if state.noflatmf and x[0][:11] == '00manifest.':
165 if state.noflatmf and x[0][:11] == '00manifest.':
166 continue
166 continue
167 yield x
167 yield x
168
168
169 elif shallowutil.isenabled(repo):
169 elif shallowutil.isenabled(repo):
170 # don't allow cloning from a shallow repo to a full repo
170 # don't allow cloning from a shallow repo to a full repo
171 # since it would require fetching every version of every
171 # since it would require fetching every version of every
172 # file in order to create the revlogs.
172 # file in order to create the revlogs.
173 raise error.Abort(_("Cannot clone from a shallow repo "
173 raise error.Abort(_("Cannot clone from a shallow repo "
174 "to a full repo."))
174 "to a full repo."))
175 else:
175 else:
176 for x in orig(repo, matcher):
176 for x in orig(repo, matcher):
177 yield x
177 yield x
178
178
179 extensions.wrapfunction(streamclone, '_walkstreamfiles', _walkstreamfiles)
179 extensions.wrapfunction(streamclone, '_walkstreamfiles', _walkstreamfiles)
180
180
181 # expose remotefilelog capabilities
181 # expose remotefilelog capabilities
182 def _capabilities(orig, repo, proto):
182 def _capabilities(orig, repo, proto):
183 caps = orig(repo, proto)
183 caps = orig(repo, proto)
184 if (shallowutil.isenabled(repo) or ui.configbool('remotefilelog',
184 if (shallowutil.isenabled(repo) or ui.configbool('remotefilelog',
185 'server')):
185 'server')):
186 if isinstance(proto, _sshv1server):
186 if isinstance(proto, _sshv1server):
187 # legacy getfiles method which only works over ssh
187 # legacy getfiles method which only works over ssh
188 caps.append(constants.NETWORK_CAP_LEGACY_SSH_GETFILES)
188 caps.append(constants.NETWORK_CAP_LEGACY_SSH_GETFILES)
189 caps.append('x_rfl_getflogheads')
189 caps.append('x_rfl_getflogheads')
190 caps.append('x_rfl_getfile')
190 caps.append('x_rfl_getfile')
191 return caps
191 return caps
192 extensions.wrapfunction(wireprotov1server, '_capabilities', _capabilities)
192 extensions.wrapfunction(wireprotov1server, '_capabilities', _capabilities)
193
193
194 def _adjustlinkrev(orig, self, *args, **kwargs):
194 def _adjustlinkrev(orig, self, *args, **kwargs):
195 # When generating file blobs, taking the real path is too slow on large
195 # When generating file blobs, taking the real path is too slow on large
196 # repos, so force it to just return the linkrev directly.
196 # repos, so force it to just return the linkrev directly.
197 repo = self._repo
197 repo = self._repo
198 if util.safehasattr(repo, 'forcelinkrev') and repo.forcelinkrev:
198 if util.safehasattr(repo, 'forcelinkrev') and repo.forcelinkrev:
199 return self._filelog.linkrev(self._filelog.rev(self._filenode))
199 return self._filelog.linkrev(self._filelog.rev(self._filenode))
200 return orig(self, *args, **kwargs)
200 return orig(self, *args, **kwargs)
201
201
202 extensions.wrapfunction(
202 extensions.wrapfunction(
203 context.basefilectx, '_adjustlinkrev', _adjustlinkrev)
203 context.basefilectx, '_adjustlinkrev', _adjustlinkrev)
204
204
205 def _iscmd(orig, cmd):
205 def _iscmd(orig, cmd):
206 if cmd == 'x_rfl_getfiles':
206 if cmd == 'x_rfl_getfiles':
207 return False
207 return False
208 return orig(cmd)
208 return orig(cmd)
209
209
210 extensions.wrapfunction(wireprotoserver, 'iscmd', _iscmd)
210 extensions.wrapfunction(wireprotoserver, 'iscmd', _iscmd)
211
211
212 def _loadfileblob(repo, cachepath, path, node):
212 def _loadfileblob(repo, cachepath, path, node):
213 filecachepath = os.path.join(cachepath, path, hex(node))
213 filecachepath = os.path.join(cachepath, path, hex(node))
214 if not os.path.exists(filecachepath) or os.path.getsize(filecachepath) == 0:
214 if not os.path.exists(filecachepath) or os.path.getsize(filecachepath) == 0:
215 filectx = repo.filectx(path, fileid=node)
215 filectx = repo.filectx(path, fileid=node)
216 if filectx.node() == nullid:
216 if filectx.node() == nullid:
217 repo.changelog = changelog.changelog(repo.svfs)
217 repo.changelog = changelog.changelog(repo.svfs)
218 filectx = repo.filectx(path, fileid=node)
218 filectx = repo.filectx(path, fileid=node)
219
219
220 text = createfileblob(filectx)
220 text = createfileblob(filectx)
221 # TODO configurable compression engines
221 # TODO configurable compression engines
222 text = zlib.compress(text)
222 text = zlib.compress(text)
223
223
224 # everything should be user & group read/writable
224 # everything should be user & group read/writable
225 oldumask = os.umask(0o002)
225 oldumask = os.umask(0o002)
226 try:
226 try:
227 dirname = os.path.dirname(filecachepath)
227 dirname = os.path.dirname(filecachepath)
228 if not os.path.exists(dirname):
228 if not os.path.exists(dirname):
229 try:
229 try:
230 os.makedirs(dirname)
230 os.makedirs(dirname)
231 except OSError as ex:
231 except OSError as ex:
232 if ex.errno != errno.EEXIST:
232 if ex.errno != errno.EEXIST:
233 raise
233 raise
234
234
235 f = None
235 f = None
236 try:
236 try:
237 f = util.atomictempfile(filecachepath, "wb")
237 f = util.atomictempfile(filecachepath, "wb")
238 f.write(text)
238 f.write(text)
239 except (IOError, OSError):
239 except (IOError, OSError):
240 # Don't abort if the user only has permission to read,
240 # Don't abort if the user only has permission to read,
241 # and not write.
241 # and not write.
242 pass
242 pass
243 finally:
243 finally:
244 if f:
244 if f:
245 f.close()
245 f.close()
246 finally:
246 finally:
247 os.umask(oldumask)
247 os.umask(oldumask)
248 else:
248 else:
249 with open(filecachepath, "rb") as f:
249 with open(filecachepath, "rb") as f:
250 text = f.read()
250 text = f.read()
251 return text
251 return text
252
252
253 def getflogheads(repo, proto, path):
253 def getflogheads(repo, proto, path):
254 """A server api for requesting a filelog's heads
254 """A server api for requesting a filelog's heads
255 """
255 """
256 flog = repo.file(path)
256 flog = repo.file(path)
257 heads = flog.heads()
257 heads = flog.heads()
258 return '\n'.join((hex(head) for head in heads if head != nullid))
258 return '\n'.join((hex(head) for head in heads if head != nullid))
259
259
260 def getfile(repo, proto, file, node):
260 def getfile(repo, proto, file, node):
261 """A server api for requesting a particular version of a file. Can be used
261 """A server api for requesting a particular version of a file. Can be used
262 in batches to request many files at once. The return protocol is:
262 in batches to request many files at once. The return protocol is:
263 <errorcode>\0<data/errormsg> where <errorcode> is 0 for success or
263 <errorcode>\0<data/errormsg> where <errorcode> is 0 for success or
264 non-zero for an error.
264 non-zero for an error.
265
265
266 data is a compressed blob with revlog flag and ancestors information. See
266 data is a compressed blob with revlog flag and ancestors information. See
267 createfileblob for its content.
267 createfileblob for its content.
268 """
268 """
269 if shallowutil.isenabled(repo):
269 if shallowutil.isenabled(repo):
270 return '1\0' + _('cannot fetch remote files from shallow repo')
270 return '1\0' + _('cannot fetch remote files from shallow repo')
271 cachepath = repo.ui.config("remotefilelog", "servercachepath")
271 cachepath = repo.ui.config("remotefilelog", "servercachepath")
272 if not cachepath:
272 if not cachepath:
273 cachepath = os.path.join(repo.path, "remotefilelogcache")
273 cachepath = os.path.join(repo.path, "remotefilelogcache")
274 node = bin(node.strip())
274 node = bin(node.strip())
275 if node == nullid:
275 if node == nullid:
276 return '0\0'
276 return '0\0'
277 return '0\0' + _loadfileblob(repo, cachepath, file, node)
277 return '0\0' + _loadfileblob(repo, cachepath, file, node)
278
278
279 def getfiles(repo, proto):
279 def getfiles(repo, proto):
280 """A server api for requesting particular versions of particular files.
280 """A server api for requesting particular versions of particular files.
281 """
281 """
282 if shallowutil.isenabled(repo):
282 if shallowutil.isenabled(repo):
283 raise error.Abort(_('cannot fetch remote files from shallow repo'))
283 raise error.Abort(_('cannot fetch remote files from shallow repo'))
284 if not isinstance(proto, _sshv1server):
284 if not isinstance(proto, _sshv1server):
285 raise error.Abort(_('cannot fetch remote files over non-ssh protocol'))
285 raise error.Abort(_('cannot fetch remote files over non-ssh protocol'))
286
286
287 def streamer():
287 def streamer():
288 fin = proto._fin
288 fin = proto._fin
289
289
290 cachepath = repo.ui.config("remotefilelog", "servercachepath")
290 cachepath = repo.ui.config("remotefilelog", "servercachepath")
291 if not cachepath:
291 if not cachepath:
292 cachepath = os.path.join(repo.path, "remotefilelogcache")
292 cachepath = os.path.join(repo.path, "remotefilelogcache")
293
293
294 while True:
294 while True:
295 request = fin.readline()[:-1]
295 request = fin.readline()[:-1]
296 if not request:
296 if not request:
297 break
297 break
298
298
299 node = bin(request[:40])
299 node = bin(request[:40])
300 if node == nullid:
300 if node == nullid:
301 yield '0\n'
301 yield '0\n'
302 continue
302 continue
303
303
304 path = request[40:]
304 path = request[40:]
305
305
306 text = _loadfileblob(repo, cachepath, path, node)
306 text = _loadfileblob(repo, cachepath, path, node)
307
307
308 yield '%d\n%s' % (len(text), text)
308 yield '%d\n%s' % (len(text), text)
309
309
310 # it would be better to only flush after processing a whole batch
310 # it would be better to only flush after processing a whole batch
311 # but currently we don't know if there are more requests coming
311 # but currently we don't know if there are more requests coming
312 proto._fout.flush()
312 proto._fout.flush()
313 return wireprototypes.streamres(streamer())
313 return wireprototypes.streamres(streamer())
314
314
315 def createfileblob(filectx):
315 def createfileblob(filectx):
316 """
316 """
317 format:
317 format:
318 v0:
318 v0:
319 str(len(rawtext)) + '\0' + rawtext + ancestortext
319 str(len(rawtext)) + '\0' + rawtext + ancestortext
320 v1:
320 v1:
321 'v1' + '\n' + metalist + '\0' + rawtext + ancestortext
321 'v1' + '\n' + metalist + '\0' + rawtext + ancestortext
322 metalist := metalist + '\n' + meta | meta
322 metalist := metalist + '\n' + meta | meta
323 meta := sizemeta | flagmeta
323 meta := sizemeta | flagmeta
324 sizemeta := METAKEYSIZE + str(len(rawtext))
324 sizemeta := METAKEYSIZE + str(len(rawtext))
325 flagmeta := METAKEYFLAG + str(flag)
325 flagmeta := METAKEYFLAG + str(flag)
326
326
327 note: sizemeta must exist. METAKEYFLAG and METAKEYSIZE must have a
327 note: sizemeta must exist. METAKEYFLAG and METAKEYSIZE must have a
328 length of 1.
328 length of 1.
329 """
329 """
330 flog = filectx.filelog()
330 flog = filectx.filelog()
331 frev = filectx.filerev()
331 frev = filectx.filerev()
332 revlogflags = flog._revlog.flags(frev)
332 revlogflags = flog._revlog.flags(frev)
333 if revlogflags == 0:
333 if revlogflags == 0:
334 # normal files
334 # normal files
335 text = filectx.data()
335 text = filectx.data()
336 else:
336 else:
337 # lfs, read raw revision data
337 # lfs, read raw revision data
338 text = flog.revision(frev, raw=True)
338 text = flog.rawdata(frev)
339
339
340 repo = filectx._repo
340 repo = filectx._repo
341
341
342 ancestors = [filectx]
342 ancestors = [filectx]
343
343
344 try:
344 try:
345 repo.forcelinkrev = True
345 repo.forcelinkrev = True
346 ancestors.extend([f for f in filectx.ancestors()])
346 ancestors.extend([f for f in filectx.ancestors()])
347
347
348 ancestortext = ""
348 ancestortext = ""
349 for ancestorctx in ancestors:
349 for ancestorctx in ancestors:
350 parents = ancestorctx.parents()
350 parents = ancestorctx.parents()
351 p1 = nullid
351 p1 = nullid
352 p2 = nullid
352 p2 = nullid
353 if len(parents) > 0:
353 if len(parents) > 0:
354 p1 = parents[0].filenode()
354 p1 = parents[0].filenode()
355 if len(parents) > 1:
355 if len(parents) > 1:
356 p2 = parents[1].filenode()
356 p2 = parents[1].filenode()
357
357
358 copyname = ""
358 copyname = ""
359 rename = ancestorctx.renamed()
359 rename = ancestorctx.renamed()
360 if rename:
360 if rename:
361 copyname = rename[0]
361 copyname = rename[0]
362 linknode = ancestorctx.node()
362 linknode = ancestorctx.node()
363 ancestortext += "%s%s%s%s%s\0" % (
363 ancestortext += "%s%s%s%s%s\0" % (
364 ancestorctx.filenode(), p1, p2, linknode,
364 ancestorctx.filenode(), p1, p2, linknode,
365 copyname)
365 copyname)
366 finally:
366 finally:
367 repo.forcelinkrev = False
367 repo.forcelinkrev = False
368
368
369 header = shallowutil.buildfileblobheader(len(text), revlogflags)
369 header = shallowutil.buildfileblobheader(len(text), revlogflags)
370
370
371 return "%s\0%s%s" % (header, text, ancestortext)
371 return "%s\0%s%s" % (header, text, ancestortext)
372
372
373 def gcserver(ui, repo):
373 def gcserver(ui, repo):
374 if not repo.ui.configbool("remotefilelog", "server"):
374 if not repo.ui.configbool("remotefilelog", "server"):
375 return
375 return
376
376
377 neededfiles = set()
377 neededfiles = set()
378 heads = repo.revs("heads(tip~25000:) - null")
378 heads = repo.revs("heads(tip~25000:) - null")
379
379
380 cachepath = repo.vfs.join("remotefilelogcache")
380 cachepath = repo.vfs.join("remotefilelogcache")
381 for head in heads:
381 for head in heads:
382 mf = repo[head].manifest()
382 mf = repo[head].manifest()
383 for filename, filenode in mf.iteritems():
383 for filename, filenode in mf.iteritems():
384 filecachepath = os.path.join(cachepath, filename, hex(filenode))
384 filecachepath = os.path.join(cachepath, filename, hex(filenode))
385 neededfiles.add(filecachepath)
385 neededfiles.add(filecachepath)
386
386
387 # delete unneeded older files
387 # delete unneeded older files
388 days = repo.ui.configint("remotefilelog", "serverexpiration")
388 days = repo.ui.configint("remotefilelog", "serverexpiration")
389 expiration = time.time() - (days * 24 * 60 * 60)
389 expiration = time.time() - (days * 24 * 60 * 60)
390
390
391 progress = ui.makeprogress(_("removing old server cache"), unit="files")
391 progress = ui.makeprogress(_("removing old server cache"), unit="files")
392 progress.update(0)
392 progress.update(0)
393 for root, dirs, files in os.walk(cachepath):
393 for root, dirs, files in os.walk(cachepath):
394 for file in files:
394 for file in files:
395 filepath = os.path.join(root, file)
395 filepath = os.path.join(root, file)
396 progress.increment()
396 progress.increment()
397 if filepath in neededfiles:
397 if filepath in neededfiles:
398 continue
398 continue
399
399
400 stat = os.stat(filepath)
400 stat = os.stat(filepath)
401 if stat.st_mtime < expiration:
401 if stat.st_mtime < expiration:
402 os.remove(filepath)
402 os.remove(filepath)
403
403
404 progress.complete()
404 progress.complete()
General Comments 0
You need to be logged in to leave comments. Login now