##// END OF EJS Templates
remotefilelogserver: add a matcher argument to _walkstreamfiles()...
Pulkit Goyal -
r40550:1c34b31a default
parent child Browse files
Show More
@@ -1,418 +1,418
1 1 # remotefilelogserver.py - server logic for a remotefilelog server
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 from __future__ import absolute_import
8 8
9 9 import errno
10 10 import os
11 11 import stat
12 12 import time
13 13 import zlib
14 14
15 15 from mercurial.i18n import _
16 16 from mercurial.node import bin, hex, nullid
17 17 from mercurial import (
18 18 changegroup,
19 19 changelog,
20 20 context,
21 21 error,
22 22 extensions,
23 23 match,
24 24 store,
25 25 streamclone,
26 26 util,
27 27 wireprotoserver,
28 28 wireprototypes,
29 29 wireprotov1server,
30 30 )
31 31 from . import (
32 32 constants,
33 33 shallowutil,
34 34 )
35 35
36 36 _sshv1server = wireprotoserver.sshv1protocolhandler
37 37
38 38 def setupserver(ui, repo):
39 39 """Sets up a normal Mercurial repo so it can serve files to shallow repos.
40 40 """
41 41 onetimesetup(ui)
42 42
43 43 # don't send files to shallow clients during pulls
44 44 def generatefiles(orig, self, changedfiles, linknodes, commonrevs, source,
45 45 *args, **kwargs):
46 46 caps = self._bundlecaps or []
47 47 if constants.BUNDLE2_CAPABLITY in caps:
48 48 # only send files that don't match the specified patterns
49 49 includepattern = None
50 50 excludepattern = None
51 51 for cap in (self._bundlecaps or []):
52 52 if cap.startswith("includepattern="):
53 53 includepattern = cap[len("includepattern="):].split('\0')
54 54 elif cap.startswith("excludepattern="):
55 55 excludepattern = cap[len("excludepattern="):].split('\0')
56 56
57 57 m = match.always(repo.root, '')
58 58 if includepattern or excludepattern:
59 59 m = match.match(repo.root, '', None,
60 60 includepattern, excludepattern)
61 61
62 62 changedfiles = list([f for f in changedfiles if not m(f)])
63 63 return orig(self, changedfiles, linknodes, commonrevs, source,
64 64 *args, **kwargs)
65 65
66 66 extensions.wrapfunction(
67 67 changegroup.cgpacker, 'generatefiles', generatefiles)
68 68
69 69 onetime = False
70 70 def onetimesetup(ui):
71 71 """Configures the wireprotocol for both clients and servers.
72 72 """
73 73 global onetime
74 74 if onetime:
75 75 return
76 76 onetime = True
77 77
78 78 # support file content requests
79 79 wireprotov1server.wireprotocommand(
80 80 'x_rfl_getflogheads', 'path', permission='pull')(getflogheads)
81 81 wireprotov1server.wireprotocommand(
82 82 'x_rfl_getfiles', '', permission='pull')(getfiles)
83 83 wireprotov1server.wireprotocommand(
84 84 'x_rfl_getfile', 'file node', permission='pull')(getfile)
85 85
86 86 class streamstate(object):
87 87 match = None
88 88 shallowremote = False
89 89 noflatmf = False
90 90 state = streamstate()
91 91
92 92 def stream_out_shallow(repo, proto, other):
93 93 includepattern = None
94 94 excludepattern = None
95 95 raw = other.get('includepattern')
96 96 if raw:
97 97 includepattern = raw.split('\0')
98 98 raw = other.get('excludepattern')
99 99 if raw:
100 100 excludepattern = raw.split('\0')
101 101
102 102 oldshallow = state.shallowremote
103 103 oldmatch = state.match
104 104 oldnoflatmf = state.noflatmf
105 105 try:
106 106 state.shallowremote = True
107 107 state.match = match.always(repo.root, '')
108 108 state.noflatmf = other.get('noflatmanifest') == 'True'
109 109 if includepattern or excludepattern:
110 110 state.match = match.match(repo.root, '', None,
111 111 includepattern, excludepattern)
112 112 streamres = wireprotov1server.stream(repo, proto)
113 113
114 114 # Force the first value to execute, so the file list is computed
115 115 # within the try/finally scope
116 116 first = next(streamres.gen)
117 117 second = next(streamres.gen)
118 118 def gen():
119 119 yield first
120 120 yield second
121 121 for value in streamres.gen:
122 122 yield value
123 123 return wireprototypes.streamres(gen())
124 124 finally:
125 125 state.shallowremote = oldshallow
126 126 state.match = oldmatch
127 127 state.noflatmf = oldnoflatmf
128 128
129 129 wireprotov1server.commands['stream_out_shallow'] = (stream_out_shallow, '*')
130 130
131 131 # don't clone filelogs to shallow clients
132 def _walkstreamfiles(orig, repo):
132 def _walkstreamfiles(orig, repo, matcher=None):
133 133 if state.shallowremote:
134 134 # if we are shallow ourselves, stream our local commits
135 135 if shallowutil.isenabled(repo):
136 136 striplen = len(repo.store.path) + 1
137 137 readdir = repo.store.rawvfs.readdir
138 138 visit = [os.path.join(repo.store.path, 'data')]
139 139 while visit:
140 140 p = visit.pop()
141 141 for f, kind, st in readdir(p, stat=True):
142 142 fp = p + '/' + f
143 143 if kind == stat.S_IFREG:
144 144 if not fp.endswith('.i') and not fp.endswith('.d'):
145 145 n = util.pconvert(fp[striplen:])
146 146 yield (store.decodedir(n), n, st.st_size)
147 147 if kind == stat.S_IFDIR:
148 148 visit.append(fp)
149 149
150 150 if 'treemanifest' in repo.requirements:
151 151 for (u, e, s) in repo.store.datafiles():
152 152 if (u.startswith('meta/') and
153 153 (u.endswith('.i') or u.endswith('.d'))):
154 154 yield (u, e, s)
155 155
156 156 # Return .d and .i files that do not match the shallow pattern
157 157 match = state.match
158 158 if match and not match.always():
159 159 for (u, e, s) in repo.store.datafiles():
160 160 f = u[5:-2] # trim data/... and .i/.d
161 161 if not state.match(f):
162 162 yield (u, e, s)
163 163
164 164 for x in repo.store.topfiles():
165 165 if state.noflatmf and x[0][:11] == '00manifest.':
166 166 continue
167 167 yield x
168 168
169 169 elif shallowutil.isenabled(repo):
170 170 # don't allow cloning from a shallow repo to a full repo
171 171 # since it would require fetching every version of every
172 172 # file in order to create the revlogs.
173 173 raise error.Abort(_("Cannot clone from a shallow repo "
174 174 "to a full repo."))
175 175 else:
176 for x in orig(repo):
176 for x in orig(repo, matcher):
177 177 yield x
178 178
179 179 extensions.wrapfunction(streamclone, '_walkstreamfiles', _walkstreamfiles)
180 180
181 181 # We no longer use getbundle_shallow commands, but we must still
182 182 # support it for migration purposes
183 183 def getbundleshallow(repo, proto, others):
184 184 bundlecaps = others.get('bundlecaps', '')
185 185 bundlecaps = set(bundlecaps.split(','))
186 186 bundlecaps.add(constants.BUNDLE2_CAPABLITY)
187 187 others['bundlecaps'] = ','.join(bundlecaps)
188 188
189 189 return wireprotov1server.commands["getbundle"][0](repo, proto, others)
190 190
191 191 wireprotov1server.commands["getbundle_shallow"] = (getbundleshallow, '*')
192 192
193 193 # expose remotefilelog capabilities
194 194 def _capabilities(orig, repo, proto):
195 195 caps = orig(repo, proto)
196 196 if (shallowutil.isenabled(repo) or ui.configbool('remotefilelog',
197 197 'server')):
198 198 if isinstance(proto, _sshv1server):
199 199 # legacy getfiles method which only works over ssh
200 200 caps.append(constants.NETWORK_CAP_LEGACY_SSH_GETFILES)
201 201 caps.append('x_rfl_getflogheads')
202 202 caps.append('x_rfl_getfile')
203 203 return caps
204 204 extensions.wrapfunction(wireprotov1server, '_capabilities', _capabilities)
205 205
206 206 def _adjustlinkrev(orig, self, *args, **kwargs):
207 207 # When generating file blobs, taking the real path is too slow on large
208 208 # repos, so force it to just return the linkrev directly.
209 209 repo = self._repo
210 210 if util.safehasattr(repo, 'forcelinkrev') and repo.forcelinkrev:
211 211 return self._filelog.linkrev(self._filelog.rev(self._filenode))
212 212 return orig(self, *args, **kwargs)
213 213
214 214 extensions.wrapfunction(
215 215 context.basefilectx, '_adjustlinkrev', _adjustlinkrev)
216 216
217 217 def _iscmd(orig, cmd):
218 218 if cmd == 'x_rfl_getfiles':
219 219 return False
220 220 return orig(cmd)
221 221
222 222 extensions.wrapfunction(wireprotoserver, 'iscmd', _iscmd)
223 223
224 224 def _loadfileblob(repo, cachepath, path, node):
225 225 filecachepath = os.path.join(cachepath, path, hex(node))
226 226 if not os.path.exists(filecachepath) or os.path.getsize(filecachepath) == 0:
227 227 filectx = repo.filectx(path, fileid=node)
228 228 if filectx.node() == nullid:
229 229 repo.changelog = changelog.changelog(repo.svfs)
230 230 filectx = repo.filectx(path, fileid=node)
231 231
232 232 text = createfileblob(filectx)
233 233 # TODO configurable compression engines
234 234 text = zlib.compress(text)
235 235
236 236 # everything should be user & group read/writable
237 237 oldumask = os.umask(0o002)
238 238 try:
239 239 dirname = os.path.dirname(filecachepath)
240 240 if not os.path.exists(dirname):
241 241 try:
242 242 os.makedirs(dirname)
243 243 except OSError as ex:
244 244 if ex.errno != errno.EEXIST:
245 245 raise
246 246
247 247 f = None
248 248 try:
249 249 f = util.atomictempfile(filecachepath, "w")
250 250 f.write(text)
251 251 except (IOError, OSError):
252 252 # Don't abort if the user only has permission to read,
253 253 # and not write.
254 254 pass
255 255 finally:
256 256 if f:
257 257 f.close()
258 258 finally:
259 259 os.umask(oldumask)
260 260 else:
261 261 with open(filecachepath, "r") as f:
262 262 text = f.read()
263 263 return text
264 264
265 265 def getflogheads(repo, proto, path):
266 266 """A server api for requesting a filelog's heads
267 267 """
268 268 flog = repo.file(path)
269 269 heads = flog.heads()
270 270 return '\n'.join((hex(head) for head in heads if head != nullid))
271 271
272 272 def getfile(repo, proto, file, node):
273 273 """A server api for requesting a particular version of a file. Can be used
274 274 in batches to request many files at once. The return protocol is:
275 275 <errorcode>\0<data/errormsg> where <errorcode> is 0 for success or
276 276 non-zero for an error.
277 277
278 278 data is a compressed blob with revlog flag and ancestors information. See
279 279 createfileblob for its content.
280 280 """
281 281 if shallowutil.isenabled(repo):
282 282 return '1\0' + _('cannot fetch remote files from shallow repo')
283 283 cachepath = repo.ui.config("remotefilelog", "servercachepath")
284 284 if not cachepath:
285 285 cachepath = os.path.join(repo.path, "remotefilelogcache")
286 286 node = bin(node.strip())
287 287 if node == nullid:
288 288 return '0\0'
289 289 return '0\0' + _loadfileblob(repo, cachepath, file, node)
290 290
291 291 def getfiles(repo, proto):
292 292 """A server api for requesting particular versions of particular files.
293 293 """
294 294 if shallowutil.isenabled(repo):
295 295 raise error.Abort(_('cannot fetch remote files from shallow repo'))
296 296 if not isinstance(proto, _sshv1server):
297 297 raise error.Abort(_('cannot fetch remote files over non-ssh protocol'))
298 298
299 299 def streamer():
300 300 fin = proto._fin
301 301
302 302 cachepath = repo.ui.config("remotefilelog", "servercachepath")
303 303 if not cachepath:
304 304 cachepath = os.path.join(repo.path, "remotefilelogcache")
305 305
306 306 while True:
307 307 request = fin.readline()[:-1]
308 308 if not request:
309 309 break
310 310
311 311 node = bin(request[:40])
312 312 if node == nullid:
313 313 yield '0\n'
314 314 continue
315 315
316 316 path = request[40:]
317 317
318 318 text = _loadfileblob(repo, cachepath, path, node)
319 319
320 320 yield '%d\n%s' % (len(text), text)
321 321
322 322 # it would be better to only flush after processing a whole batch
323 323 # but currently we don't know if there are more requests coming
324 324 proto._fout.flush()
325 325 return wireprototypes.streamres(streamer())
326 326
327 327 def createfileblob(filectx):
328 328 """
329 329 format:
330 330 v0:
331 331 str(len(rawtext)) + '\0' + rawtext + ancestortext
332 332 v1:
333 333 'v1' + '\n' + metalist + '\0' + rawtext + ancestortext
334 334 metalist := metalist + '\n' + meta | meta
335 335 meta := sizemeta | flagmeta
336 336 sizemeta := METAKEYSIZE + str(len(rawtext))
337 337 flagmeta := METAKEYFLAG + str(flag)
338 338
339 339 note: sizemeta must exist. METAKEYFLAG and METAKEYSIZE must have a
340 340 length of 1.
341 341 """
342 342 flog = filectx.filelog()
343 343 frev = filectx.filerev()
344 344 revlogflags = flog._revlog.flags(frev)
345 345 if revlogflags == 0:
346 346 # normal files
347 347 text = filectx.data()
348 348 else:
349 349 # lfs, read raw revision data
350 350 text = flog.revision(frev, raw=True)
351 351
352 352 repo = filectx._repo
353 353
354 354 ancestors = [filectx]
355 355
356 356 try:
357 357 repo.forcelinkrev = True
358 358 ancestors.extend([f for f in filectx.ancestors()])
359 359
360 360 ancestortext = ""
361 361 for ancestorctx in ancestors:
362 362 parents = ancestorctx.parents()
363 363 p1 = nullid
364 364 p2 = nullid
365 365 if len(parents) > 0:
366 366 p1 = parents[0].filenode()
367 367 if len(parents) > 1:
368 368 p2 = parents[1].filenode()
369 369
370 370 copyname = ""
371 371 rename = ancestorctx.renamed()
372 372 if rename:
373 373 copyname = rename[0]
374 374 linknode = ancestorctx.node()
375 375 ancestortext += "%s%s%s%s%s\0" % (
376 376 ancestorctx.filenode(), p1, p2, linknode,
377 377 copyname)
378 378 finally:
379 379 repo.forcelinkrev = False
380 380
381 381 header = shallowutil.buildfileblobheader(len(text), revlogflags)
382 382
383 383 return "%s\0%s%s" % (header, text, ancestortext)
384 384
385 385 def gcserver(ui, repo):
386 386 if not repo.ui.configbool("remotefilelog", "server"):
387 387 return
388 388
389 389 neededfiles = set()
390 390 heads = repo.revs("heads(tip~25000:) - null")
391 391
392 392 cachepath = repo.vfs.join("remotefilelogcache")
393 393 for head in heads:
394 394 mf = repo[head].manifest()
395 395 for filename, filenode in mf.iteritems():
396 396 filecachepath = os.path.join(cachepath, filename, hex(filenode))
397 397 neededfiles.add(filecachepath)
398 398
399 399 # delete unneeded older files
400 400 days = repo.ui.configint("remotefilelog", "serverexpiration")
401 401 expiration = time.time() - (days * 24 * 60 * 60)
402 402
403 403 _removing = _("removing old server cache")
404 404 count = 0
405 405 ui.progress(_removing, count, unit="files")
406 406 for root, dirs, files in os.walk(cachepath):
407 407 for file in files:
408 408 filepath = os.path.join(root, file)
409 409 count += 1
410 410 ui.progress(_removing, count, unit="files")
411 411 if filepath in neededfiles:
412 412 continue
413 413
414 414 stat = os.stat(filepath)
415 415 if stat.st_mtime < expiration:
416 416 os.remove(filepath)
417 417
418 418 ui.progress(_removing, None)
General Comments 0
You need to be logged in to leave comments. Login now