##// END OF EJS Templates
remotefilelog: use progress helper in remotefilelogserver...
Martin von Zweigbergk -
r40877:fbd053af default
parent child Browse files
Show More
@@ -1,406 +1,404 b''
1 1 # remotefilelogserver.py - server logic for a remotefilelog server
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 from __future__ import absolute_import
8 8
9 9 import errno
10 10 import os
11 11 import stat
12 12 import time
13 13 import zlib
14 14
15 15 from mercurial.i18n import _
16 16 from mercurial.node import bin, hex, nullid
17 17 from mercurial import (
18 18 changegroup,
19 19 changelog,
20 20 context,
21 21 error,
22 22 extensions,
23 23 match,
24 24 store,
25 25 streamclone,
26 26 util,
27 27 wireprotoserver,
28 28 wireprototypes,
29 29 wireprotov1server,
30 30 )
31 31 from . import (
32 32 constants,
33 33 shallowutil,
34 34 )
35 35
36 36 _sshv1server = wireprotoserver.sshv1protocolhandler
37 37
38 38 def setupserver(ui, repo):
39 39 """Sets up a normal Mercurial repo so it can serve files to shallow repos.
40 40 """
41 41 onetimesetup(ui)
42 42
43 43 # don't send files to shallow clients during pulls
44 44 def generatefiles(orig, self, changedfiles, linknodes, commonrevs, source,
45 45 *args, **kwargs):
46 46 caps = self._bundlecaps or []
47 47 if constants.BUNDLE2_CAPABLITY in caps:
48 48 # only send files that don't match the specified patterns
49 49 includepattern = None
50 50 excludepattern = None
51 51 for cap in (self._bundlecaps or []):
52 52 if cap.startswith("includepattern="):
53 53 includepattern = cap[len("includepattern="):].split('\0')
54 54 elif cap.startswith("excludepattern="):
55 55 excludepattern = cap[len("excludepattern="):].split('\0')
56 56
57 57 m = match.always(repo.root, '')
58 58 if includepattern or excludepattern:
59 59 m = match.match(repo.root, '', None,
60 60 includepattern, excludepattern)
61 61
62 62 changedfiles = list([f for f in changedfiles if not m(f)])
63 63 return orig(self, changedfiles, linknodes, commonrevs, source,
64 64 *args, **kwargs)
65 65
66 66 extensions.wrapfunction(
67 67 changegroup.cgpacker, 'generatefiles', generatefiles)
68 68
69 69 onetime = False
70 70 def onetimesetup(ui):
71 71 """Configures the wireprotocol for both clients and servers.
72 72 """
73 73 global onetime
74 74 if onetime:
75 75 return
76 76 onetime = True
77 77
78 78 # support file content requests
79 79 wireprotov1server.wireprotocommand(
80 80 'x_rfl_getflogheads', 'path', permission='pull')(getflogheads)
81 81 wireprotov1server.wireprotocommand(
82 82 'x_rfl_getfiles', '', permission='pull')(getfiles)
83 83 wireprotov1server.wireprotocommand(
84 84 'x_rfl_getfile', 'file node', permission='pull')(getfile)
85 85
86 86 class streamstate(object):
87 87 match = None
88 88 shallowremote = False
89 89 noflatmf = False
90 90 state = streamstate()
91 91
92 92 def stream_out_shallow(repo, proto, other):
93 93 includepattern = None
94 94 excludepattern = None
95 95 raw = other.get('includepattern')
96 96 if raw:
97 97 includepattern = raw.split('\0')
98 98 raw = other.get('excludepattern')
99 99 if raw:
100 100 excludepattern = raw.split('\0')
101 101
102 102 oldshallow = state.shallowremote
103 103 oldmatch = state.match
104 104 oldnoflatmf = state.noflatmf
105 105 try:
106 106 state.shallowremote = True
107 107 state.match = match.always(repo.root, '')
108 108 state.noflatmf = other.get('noflatmanifest') == 'True'
109 109 if includepattern or excludepattern:
110 110 state.match = match.match(repo.root, '', None,
111 111 includepattern, excludepattern)
112 112 streamres = wireprotov1server.stream(repo, proto)
113 113
114 114 # Force the first value to execute, so the file list is computed
115 115 # within the try/finally scope
116 116 first = next(streamres.gen)
117 117 second = next(streamres.gen)
118 118 def gen():
119 119 yield first
120 120 yield second
121 121 for value in streamres.gen:
122 122 yield value
123 123 return wireprototypes.streamres(gen())
124 124 finally:
125 125 state.shallowremote = oldshallow
126 126 state.match = oldmatch
127 127 state.noflatmf = oldnoflatmf
128 128
129 129 wireprotov1server.commands['stream_out_shallow'] = (stream_out_shallow, '*')
130 130
131 131 # don't clone filelogs to shallow clients
132 132 def _walkstreamfiles(orig, repo, matcher=None):
133 133 if state.shallowremote:
134 134 # if we are shallow ourselves, stream our local commits
135 135 if shallowutil.isenabled(repo):
136 136 striplen = len(repo.store.path) + 1
137 137 readdir = repo.store.rawvfs.readdir
138 138 visit = [os.path.join(repo.store.path, 'data')]
139 139 while visit:
140 140 p = visit.pop()
141 141 for f, kind, st in readdir(p, stat=True):
142 142 fp = p + '/' + f
143 143 if kind == stat.S_IFREG:
144 144 if not fp.endswith('.i') and not fp.endswith('.d'):
145 145 n = util.pconvert(fp[striplen:])
146 146 yield (store.decodedir(n), n, st.st_size)
147 147 if kind == stat.S_IFDIR:
148 148 visit.append(fp)
149 149
150 150 if 'treemanifest' in repo.requirements:
151 151 for (u, e, s) in repo.store.datafiles():
152 152 if (u.startswith('meta/') and
153 153 (u.endswith('.i') or u.endswith('.d'))):
154 154 yield (u, e, s)
155 155
156 156 # Return .d and .i files that do not match the shallow pattern
157 157 match = state.match
158 158 if match and not match.always():
159 159 for (u, e, s) in repo.store.datafiles():
160 160 f = u[5:-2] # trim data/... and .i/.d
161 161 if not state.match(f):
162 162 yield (u, e, s)
163 163
164 164 for x in repo.store.topfiles():
165 165 if state.noflatmf and x[0][:11] == '00manifest.':
166 166 continue
167 167 yield x
168 168
169 169 elif shallowutil.isenabled(repo):
170 170 # don't allow cloning from a shallow repo to a full repo
171 171 # since it would require fetching every version of every
172 172 # file in order to create the revlogs.
173 173 raise error.Abort(_("Cannot clone from a shallow repo "
174 174 "to a full repo."))
175 175 else:
176 176 for x in orig(repo, matcher):
177 177 yield x
178 178
179 179 extensions.wrapfunction(streamclone, '_walkstreamfiles', _walkstreamfiles)
180 180
181 181 # expose remotefilelog capabilities
182 182 def _capabilities(orig, repo, proto):
183 183 caps = orig(repo, proto)
184 184 if (shallowutil.isenabled(repo) or ui.configbool('remotefilelog',
185 185 'server')):
186 186 if isinstance(proto, _sshv1server):
187 187 # legacy getfiles method which only works over ssh
188 188 caps.append(constants.NETWORK_CAP_LEGACY_SSH_GETFILES)
189 189 caps.append('x_rfl_getflogheads')
190 190 caps.append('x_rfl_getfile')
191 191 return caps
192 192 extensions.wrapfunction(wireprotov1server, '_capabilities', _capabilities)
193 193
194 194 def _adjustlinkrev(orig, self, *args, **kwargs):
195 195 # When generating file blobs, taking the real path is too slow on large
196 196 # repos, so force it to just return the linkrev directly.
197 197 repo = self._repo
198 198 if util.safehasattr(repo, 'forcelinkrev') and repo.forcelinkrev:
199 199 return self._filelog.linkrev(self._filelog.rev(self._filenode))
200 200 return orig(self, *args, **kwargs)
201 201
202 202 extensions.wrapfunction(
203 203 context.basefilectx, '_adjustlinkrev', _adjustlinkrev)
204 204
205 205 def _iscmd(orig, cmd):
206 206 if cmd == 'x_rfl_getfiles':
207 207 return False
208 208 return orig(cmd)
209 209
210 210 extensions.wrapfunction(wireprotoserver, 'iscmd', _iscmd)
211 211
212 212 def _loadfileblob(repo, cachepath, path, node):
213 213 filecachepath = os.path.join(cachepath, path, hex(node))
214 214 if not os.path.exists(filecachepath) or os.path.getsize(filecachepath) == 0:
215 215 filectx = repo.filectx(path, fileid=node)
216 216 if filectx.node() == nullid:
217 217 repo.changelog = changelog.changelog(repo.svfs)
218 218 filectx = repo.filectx(path, fileid=node)
219 219
220 220 text = createfileblob(filectx)
221 221 # TODO configurable compression engines
222 222 text = zlib.compress(text)
223 223
224 224 # everything should be user & group read/writable
225 225 oldumask = os.umask(0o002)
226 226 try:
227 227 dirname = os.path.dirname(filecachepath)
228 228 if not os.path.exists(dirname):
229 229 try:
230 230 os.makedirs(dirname)
231 231 except OSError as ex:
232 232 if ex.errno != errno.EEXIST:
233 233 raise
234 234
235 235 f = None
236 236 try:
237 237 f = util.atomictempfile(filecachepath, "wb")
238 238 f.write(text)
239 239 except (IOError, OSError):
240 240 # Don't abort if the user only has permission to read,
241 241 # and not write.
242 242 pass
243 243 finally:
244 244 if f:
245 245 f.close()
246 246 finally:
247 247 os.umask(oldumask)
248 248 else:
249 249 with open(filecachepath, "rb") as f:
250 250 text = f.read()
251 251 return text
252 252
253 253 def getflogheads(repo, proto, path):
254 254 """A server api for requesting a filelog's heads
255 255 """
256 256 flog = repo.file(path)
257 257 heads = flog.heads()
258 258 return '\n'.join((hex(head) for head in heads if head != nullid))
259 259
260 260 def getfile(repo, proto, file, node):
261 261 """A server api for requesting a particular version of a file. Can be used
262 262 in batches to request many files at once. The return protocol is:
263 263 <errorcode>\0<data/errormsg> where <errorcode> is 0 for success or
264 264 non-zero for an error.
265 265
266 266 data is a compressed blob with revlog flag and ancestors information. See
267 267 createfileblob for its content.
268 268 """
269 269 if shallowutil.isenabled(repo):
270 270 return '1\0' + _('cannot fetch remote files from shallow repo')
271 271 cachepath = repo.ui.config("remotefilelog", "servercachepath")
272 272 if not cachepath:
273 273 cachepath = os.path.join(repo.path, "remotefilelogcache")
274 274 node = bin(node.strip())
275 275 if node == nullid:
276 276 return '0\0'
277 277 return '0\0' + _loadfileblob(repo, cachepath, file, node)
278 278
279 279 def getfiles(repo, proto):
280 280 """A server api for requesting particular versions of particular files.
281 281 """
282 282 if shallowutil.isenabled(repo):
283 283 raise error.Abort(_('cannot fetch remote files from shallow repo'))
284 284 if not isinstance(proto, _sshv1server):
285 285 raise error.Abort(_('cannot fetch remote files over non-ssh protocol'))
286 286
287 287 def streamer():
288 288 fin = proto._fin
289 289
290 290 cachepath = repo.ui.config("remotefilelog", "servercachepath")
291 291 if not cachepath:
292 292 cachepath = os.path.join(repo.path, "remotefilelogcache")
293 293
294 294 while True:
295 295 request = fin.readline()[:-1]
296 296 if not request:
297 297 break
298 298
299 299 node = bin(request[:40])
300 300 if node == nullid:
301 301 yield '0\n'
302 302 continue
303 303
304 304 path = request[40:]
305 305
306 306 text = _loadfileblob(repo, cachepath, path, node)
307 307
308 308 yield '%d\n%s' % (len(text), text)
309 309
310 310 # it would be better to only flush after processing a whole batch
311 311 # but currently we don't know if there are more requests coming
312 312 proto._fout.flush()
313 313 return wireprototypes.streamres(streamer())
314 314
315 315 def createfileblob(filectx):
316 316 """
317 317 format:
318 318 v0:
319 319 str(len(rawtext)) + '\0' + rawtext + ancestortext
320 320 v1:
321 321 'v1' + '\n' + metalist + '\0' + rawtext + ancestortext
322 322 metalist := metalist + '\n' + meta | meta
323 323 meta := sizemeta | flagmeta
324 324 sizemeta := METAKEYSIZE + str(len(rawtext))
325 325 flagmeta := METAKEYFLAG + str(flag)
326 326
327 327 note: sizemeta must exist. METAKEYFLAG and METAKEYSIZE must have a
328 328 length of 1.
329 329 """
330 330 flog = filectx.filelog()
331 331 frev = filectx.filerev()
332 332 revlogflags = flog._revlog.flags(frev)
333 333 if revlogflags == 0:
334 334 # normal files
335 335 text = filectx.data()
336 336 else:
337 337 # lfs, read raw revision data
338 338 text = flog.revision(frev, raw=True)
339 339
340 340 repo = filectx._repo
341 341
342 342 ancestors = [filectx]
343 343
344 344 try:
345 345 repo.forcelinkrev = True
346 346 ancestors.extend([f for f in filectx.ancestors()])
347 347
348 348 ancestortext = ""
349 349 for ancestorctx in ancestors:
350 350 parents = ancestorctx.parents()
351 351 p1 = nullid
352 352 p2 = nullid
353 353 if len(parents) > 0:
354 354 p1 = parents[0].filenode()
355 355 if len(parents) > 1:
356 356 p2 = parents[1].filenode()
357 357
358 358 copyname = ""
359 359 rename = ancestorctx.renamed()
360 360 if rename:
361 361 copyname = rename[0]
362 362 linknode = ancestorctx.node()
363 363 ancestortext += "%s%s%s%s%s\0" % (
364 364 ancestorctx.filenode(), p1, p2, linknode,
365 365 copyname)
366 366 finally:
367 367 repo.forcelinkrev = False
368 368
369 369 header = shallowutil.buildfileblobheader(len(text), revlogflags)
370 370
371 371 return "%s\0%s%s" % (header, text, ancestortext)
372 372
373 373 def gcserver(ui, repo):
374 374 if not repo.ui.configbool("remotefilelog", "server"):
375 375 return
376 376
377 377 neededfiles = set()
378 378 heads = repo.revs("heads(tip~25000:) - null")
379 379
380 380 cachepath = repo.vfs.join("remotefilelogcache")
381 381 for head in heads:
382 382 mf = repo[head].manifest()
383 383 for filename, filenode in mf.iteritems():
384 384 filecachepath = os.path.join(cachepath, filename, hex(filenode))
385 385 neededfiles.add(filecachepath)
386 386
387 387 # delete unneeded older files
388 388 days = repo.ui.configint("remotefilelog", "serverexpiration")
389 389 expiration = time.time() - (days * 24 * 60 * 60)
390 390
391 _removing = _("removing old server cache")
392 count = 0
393 ui.progress(_removing, count, unit="files")
391 progress = ui.makeprogress(_("removing old server cache"), unit="files")
392 progress.update(0)
394 393 for root, dirs, files in os.walk(cachepath):
395 394 for file in files:
396 395 filepath = os.path.join(root, file)
397 count += 1
398 ui.progress(_removing, count, unit="files")
396 progress.increment()
399 397 if filepath in neededfiles:
400 398 continue
401 399
402 400 stat = os.stat(filepath)
403 401 if stat.st_mtime < expiration:
404 402 os.remove(filepath)
405 403
406 ui.progress(_removing, None)
404 progress.complete()
General Comments 0
You need to be logged in to leave comments. Login now