##// END OF EJS Templates
remotefilelog: drop compat code for "getbundle_shallow" wireprotocol command...
Pulkit Goyal -
r40552:2dd3a020 default
parent child Browse files
Show More
@@ -1,418 +1,406
1 1 # remotefilelogserver.py - server logic for a remotefilelog server
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 from __future__ import absolute_import
8 8
9 9 import errno
10 10 import os
11 11 import stat
12 12 import time
13 13 import zlib
14 14
15 15 from mercurial.i18n import _
16 16 from mercurial.node import bin, hex, nullid
17 17 from mercurial import (
18 18 changegroup,
19 19 changelog,
20 20 context,
21 21 error,
22 22 extensions,
23 23 match,
24 24 store,
25 25 streamclone,
26 26 util,
27 27 wireprotoserver,
28 28 wireprototypes,
29 29 wireprotov1server,
30 30 )
31 31 from . import (
32 32 constants,
33 33 shallowutil,
34 34 )
35 35
36 36 _sshv1server = wireprotoserver.sshv1protocolhandler
37 37
38 38 def setupserver(ui, repo):
39 39 """Sets up a normal Mercurial repo so it can serve files to shallow repos.
40 40 """
41 41 onetimesetup(ui)
42 42
43 43 # don't send files to shallow clients during pulls
44 44 def generatefiles(orig, self, changedfiles, linknodes, commonrevs, source,
45 45 *args, **kwargs):
46 46 caps = self._bundlecaps or []
47 47 if constants.BUNDLE2_CAPABLITY in caps:
48 48 # only send files that don't match the specified patterns
49 49 includepattern = None
50 50 excludepattern = None
51 51 for cap in (self._bundlecaps or []):
52 52 if cap.startswith("includepattern="):
53 53 includepattern = cap[len("includepattern="):].split('\0')
54 54 elif cap.startswith("excludepattern="):
55 55 excludepattern = cap[len("excludepattern="):].split('\0')
56 56
57 57 m = match.always(repo.root, '')
58 58 if includepattern or excludepattern:
59 59 m = match.match(repo.root, '', None,
60 60 includepattern, excludepattern)
61 61
62 62 changedfiles = list([f for f in changedfiles if not m(f)])
63 63 return orig(self, changedfiles, linknodes, commonrevs, source,
64 64 *args, **kwargs)
65 65
66 66 extensions.wrapfunction(
67 67 changegroup.cgpacker, 'generatefiles', generatefiles)
68 68
69 69 onetime = False
70 70 def onetimesetup(ui):
71 71 """Configures the wireprotocol for both clients and servers.
72 72 """
73 73 global onetime
74 74 if onetime:
75 75 return
76 76 onetime = True
77 77
78 78 # support file content requests
79 79 wireprotov1server.wireprotocommand(
80 80 'x_rfl_getflogheads', 'path', permission='pull')(getflogheads)
81 81 wireprotov1server.wireprotocommand(
82 82 'x_rfl_getfiles', '', permission='pull')(getfiles)
83 83 wireprotov1server.wireprotocommand(
84 84 'x_rfl_getfile', 'file node', permission='pull')(getfile)
85 85
86 86 class streamstate(object):
87 87 match = None
88 88 shallowremote = False
89 89 noflatmf = False
90 90 state = streamstate()
91 91
92 92 def stream_out_shallow(repo, proto, other):
93 93 includepattern = None
94 94 excludepattern = None
95 95 raw = other.get('includepattern')
96 96 if raw:
97 97 includepattern = raw.split('\0')
98 98 raw = other.get('excludepattern')
99 99 if raw:
100 100 excludepattern = raw.split('\0')
101 101
102 102 oldshallow = state.shallowremote
103 103 oldmatch = state.match
104 104 oldnoflatmf = state.noflatmf
105 105 try:
106 106 state.shallowremote = True
107 107 state.match = match.always(repo.root, '')
108 108 state.noflatmf = other.get('noflatmanifest') == 'True'
109 109 if includepattern or excludepattern:
110 110 state.match = match.match(repo.root, '', None,
111 111 includepattern, excludepattern)
112 112 streamres = wireprotov1server.stream(repo, proto)
113 113
114 114 # Force the first value to execute, so the file list is computed
115 115 # within the try/finally scope
116 116 first = next(streamres.gen)
117 117 second = next(streamres.gen)
118 118 def gen():
119 119 yield first
120 120 yield second
121 121 for value in streamres.gen:
122 122 yield value
123 123 return wireprototypes.streamres(gen())
124 124 finally:
125 125 state.shallowremote = oldshallow
126 126 state.match = oldmatch
127 127 state.noflatmf = oldnoflatmf
128 128
129 129 wireprotov1server.commands['stream_out_shallow'] = (stream_out_shallow, '*')
130 130
131 131 # don't clone filelogs to shallow clients
132 132 def _walkstreamfiles(orig, repo, matcher=None):
133 133 if state.shallowremote:
134 134 # if we are shallow ourselves, stream our local commits
135 135 if shallowutil.isenabled(repo):
136 136 striplen = len(repo.store.path) + 1
137 137 readdir = repo.store.rawvfs.readdir
138 138 visit = [os.path.join(repo.store.path, 'data')]
139 139 while visit:
140 140 p = visit.pop()
141 141 for f, kind, st in readdir(p, stat=True):
142 142 fp = p + '/' + f
143 143 if kind == stat.S_IFREG:
144 144 if not fp.endswith('.i') and not fp.endswith('.d'):
145 145 n = util.pconvert(fp[striplen:])
146 146 yield (store.decodedir(n), n, st.st_size)
147 147 if kind == stat.S_IFDIR:
148 148 visit.append(fp)
149 149
150 150 if 'treemanifest' in repo.requirements:
151 151 for (u, e, s) in repo.store.datafiles():
152 152 if (u.startswith('meta/') and
153 153 (u.endswith('.i') or u.endswith('.d'))):
154 154 yield (u, e, s)
155 155
156 156 # Return .d and .i files that do not match the shallow pattern
157 157 match = state.match
158 158 if match and not match.always():
159 159 for (u, e, s) in repo.store.datafiles():
160 160 f = u[5:-2] # trim data/... and .i/.d
161 161 if not state.match(f):
162 162 yield (u, e, s)
163 163
164 164 for x in repo.store.topfiles():
165 165 if state.noflatmf and x[0][:11] == '00manifest.':
166 166 continue
167 167 yield x
168 168
169 169 elif shallowutil.isenabled(repo):
170 170 # don't allow cloning from a shallow repo to a full repo
171 171 # since it would require fetching every version of every
172 172 # file in order to create the revlogs.
173 173 raise error.Abort(_("Cannot clone from a shallow repo "
174 174 "to a full repo."))
175 175 else:
176 176 for x in orig(repo, matcher):
177 177 yield x
178 178
179 179 extensions.wrapfunction(streamclone, '_walkstreamfiles', _walkstreamfiles)
180 180
181 # We no longer use getbundle_shallow commands, but we must still
182 # support it for migration purposes
183 def getbundleshallow(repo, proto, others):
184 bundlecaps = others.get('bundlecaps', '')
185 bundlecaps = set(bundlecaps.split(','))
186 bundlecaps.add(constants.BUNDLE2_CAPABLITY)
187 others['bundlecaps'] = ','.join(bundlecaps)
188
189 return wireprotov1server.commands["getbundle"][0](repo, proto, others)
190
191 wireprotov1server.commands["getbundle_shallow"] = (getbundleshallow, '*')
192
193 181 # expose remotefilelog capabilities
194 182 def _capabilities(orig, repo, proto):
195 183 caps = orig(repo, proto)
196 184 if (shallowutil.isenabled(repo) or ui.configbool('remotefilelog',
197 185 'server')):
198 186 if isinstance(proto, _sshv1server):
199 187 # legacy getfiles method which only works over ssh
200 188 caps.append(constants.NETWORK_CAP_LEGACY_SSH_GETFILES)
201 189 caps.append('x_rfl_getflogheads')
202 190 caps.append('x_rfl_getfile')
203 191 return caps
204 192 extensions.wrapfunction(wireprotov1server, '_capabilities', _capabilities)
205 193
206 194 def _adjustlinkrev(orig, self, *args, **kwargs):
207 195 # When generating file blobs, taking the real path is too slow on large
208 196 # repos, so force it to just return the linkrev directly.
209 197 repo = self._repo
210 198 if util.safehasattr(repo, 'forcelinkrev') and repo.forcelinkrev:
211 199 return self._filelog.linkrev(self._filelog.rev(self._filenode))
212 200 return orig(self, *args, **kwargs)
213 201
214 202 extensions.wrapfunction(
215 203 context.basefilectx, '_adjustlinkrev', _adjustlinkrev)
216 204
217 205 def _iscmd(orig, cmd):
218 206 if cmd == 'x_rfl_getfiles':
219 207 return False
220 208 return orig(cmd)
221 209
222 210 extensions.wrapfunction(wireprotoserver, 'iscmd', _iscmd)
223 211
224 212 def _loadfileblob(repo, cachepath, path, node):
225 213 filecachepath = os.path.join(cachepath, path, hex(node))
226 214 if not os.path.exists(filecachepath) or os.path.getsize(filecachepath) == 0:
227 215 filectx = repo.filectx(path, fileid=node)
228 216 if filectx.node() == nullid:
229 217 repo.changelog = changelog.changelog(repo.svfs)
230 218 filectx = repo.filectx(path, fileid=node)
231 219
232 220 text = createfileblob(filectx)
233 221 # TODO configurable compression engines
234 222 text = zlib.compress(text)
235 223
236 224 # everything should be user & group read/writable
237 225 oldumask = os.umask(0o002)
238 226 try:
239 227 dirname = os.path.dirname(filecachepath)
240 228 if not os.path.exists(dirname):
241 229 try:
242 230 os.makedirs(dirname)
243 231 except OSError as ex:
244 232 if ex.errno != errno.EEXIST:
245 233 raise
246 234
247 235 f = None
248 236 try:
249 237 f = util.atomictempfile(filecachepath, "w")
250 238 f.write(text)
251 239 except (IOError, OSError):
252 240 # Don't abort if the user only has permission to read,
253 241 # and not write.
254 242 pass
255 243 finally:
256 244 if f:
257 245 f.close()
258 246 finally:
259 247 os.umask(oldumask)
260 248 else:
261 249 with open(filecachepath, "r") as f:
262 250 text = f.read()
263 251 return text
264 252
265 253 def getflogheads(repo, proto, path):
266 254 """A server api for requesting a filelog's heads
267 255 """
268 256 flog = repo.file(path)
269 257 heads = flog.heads()
270 258 return '\n'.join((hex(head) for head in heads if head != nullid))
271 259
272 260 def getfile(repo, proto, file, node):
273 261 """A server api for requesting a particular version of a file. Can be used
274 262 in batches to request many files at once. The return protocol is:
275 263 <errorcode>\0<data/errormsg> where <errorcode> is 0 for success or
276 264 non-zero for an error.
277 265
278 266 data is a compressed blob with revlog flag and ancestors information. See
279 267 createfileblob for its content.
280 268 """
281 269 if shallowutil.isenabled(repo):
282 270 return '1\0' + _('cannot fetch remote files from shallow repo')
283 271 cachepath = repo.ui.config("remotefilelog", "servercachepath")
284 272 if not cachepath:
285 273 cachepath = os.path.join(repo.path, "remotefilelogcache")
286 274 node = bin(node.strip())
287 275 if node == nullid:
288 276 return '0\0'
289 277 return '0\0' + _loadfileblob(repo, cachepath, file, node)
290 278
291 279 def getfiles(repo, proto):
292 280 """A server api for requesting particular versions of particular files.
293 281 """
294 282 if shallowutil.isenabled(repo):
295 283 raise error.Abort(_('cannot fetch remote files from shallow repo'))
296 284 if not isinstance(proto, _sshv1server):
297 285 raise error.Abort(_('cannot fetch remote files over non-ssh protocol'))
298 286
299 287 def streamer():
300 288 fin = proto._fin
301 289
302 290 cachepath = repo.ui.config("remotefilelog", "servercachepath")
303 291 if not cachepath:
304 292 cachepath = os.path.join(repo.path, "remotefilelogcache")
305 293
306 294 while True:
307 295 request = fin.readline()[:-1]
308 296 if not request:
309 297 break
310 298
311 299 node = bin(request[:40])
312 300 if node == nullid:
313 301 yield '0\n'
314 302 continue
315 303
316 304 path = request[40:]
317 305
318 306 text = _loadfileblob(repo, cachepath, path, node)
319 307
320 308 yield '%d\n%s' % (len(text), text)
321 309
322 310 # it would be better to only flush after processing a whole batch
323 311 # but currently we don't know if there are more requests coming
324 312 proto._fout.flush()
325 313 return wireprototypes.streamres(streamer())
326 314
327 315 def createfileblob(filectx):
328 316 """
329 317 format:
330 318 v0:
331 319 str(len(rawtext)) + '\0' + rawtext + ancestortext
332 320 v1:
333 321 'v1' + '\n' + metalist + '\0' + rawtext + ancestortext
334 322 metalist := metalist + '\n' + meta | meta
335 323 meta := sizemeta | flagmeta
336 324 sizemeta := METAKEYSIZE + str(len(rawtext))
337 325 flagmeta := METAKEYFLAG + str(flag)
338 326
339 327 note: sizemeta must exist. METAKEYFLAG and METAKEYSIZE must have a
340 328 length of 1.
341 329 """
342 330 flog = filectx.filelog()
343 331 frev = filectx.filerev()
344 332 revlogflags = flog._revlog.flags(frev)
345 333 if revlogflags == 0:
346 334 # normal files
347 335 text = filectx.data()
348 336 else:
349 337 # lfs, read raw revision data
350 338 text = flog.revision(frev, raw=True)
351 339
352 340 repo = filectx._repo
353 341
354 342 ancestors = [filectx]
355 343
356 344 try:
357 345 repo.forcelinkrev = True
358 346 ancestors.extend([f for f in filectx.ancestors()])
359 347
360 348 ancestortext = ""
361 349 for ancestorctx in ancestors:
362 350 parents = ancestorctx.parents()
363 351 p1 = nullid
364 352 p2 = nullid
365 353 if len(parents) > 0:
366 354 p1 = parents[0].filenode()
367 355 if len(parents) > 1:
368 356 p2 = parents[1].filenode()
369 357
370 358 copyname = ""
371 359 rename = ancestorctx.renamed()
372 360 if rename:
373 361 copyname = rename[0]
374 362 linknode = ancestorctx.node()
375 363 ancestortext += "%s%s%s%s%s\0" % (
376 364 ancestorctx.filenode(), p1, p2, linknode,
377 365 copyname)
378 366 finally:
379 367 repo.forcelinkrev = False
380 368
381 369 header = shallowutil.buildfileblobheader(len(text), revlogflags)
382 370
383 371 return "%s\0%s%s" % (header, text, ancestortext)
384 372
385 373 def gcserver(ui, repo):
386 374 if not repo.ui.configbool("remotefilelog", "server"):
387 375 return
388 376
389 377 neededfiles = set()
390 378 heads = repo.revs("heads(tip~25000:) - null")
391 379
392 380 cachepath = repo.vfs.join("remotefilelogcache")
393 381 for head in heads:
394 382 mf = repo[head].manifest()
395 383 for filename, filenode in mf.iteritems():
396 384 filecachepath = os.path.join(cachepath, filename, hex(filenode))
397 385 neededfiles.add(filecachepath)
398 386
399 387 # delete unneeded older files
400 388 days = repo.ui.configint("remotefilelog", "serverexpiration")
401 389 expiration = time.time() - (days * 24 * 60 * 60)
402 390
403 391 _removing = _("removing old server cache")
404 392 count = 0
405 393 ui.progress(_removing, count, unit="files")
406 394 for root, dirs, files in os.walk(cachepath):
407 395 for file in files:
408 396 filepath = os.path.join(root, file)
409 397 count += 1
410 398 ui.progress(_removing, count, unit="files")
411 399 if filepath in neededfiles:
412 400 continue
413 401
414 402 stat = os.stat(filepath)
415 403 if stat.st_mtime < expiration:
416 404 os.remove(filepath)
417 405
418 406 ui.progress(_removing, None)
General Comments 0
You need to be logged in to leave comments. Login now