##// END OF EJS Templates
remotefilelog: fix what looks like a wrong refactoring...
Valentin Gatien-Baron -
r48601:6802422a stable
parent child Browse files
Show More
@@ -1,441 +1,441 b''
1 1 # remotefilelogserver.py - server logic for a remotefilelog server
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 from __future__ import absolute_import
8 8
9 9 import errno
10 10 import os
11 11 import stat
12 12 import time
13 13 import zlib
14 14
15 15 from mercurial.i18n import _
16 16 from mercurial.node import bin, hex
17 17 from mercurial.pycompat import open
18 18 from mercurial import (
19 19 changegroup,
20 20 changelog,
21 21 context,
22 22 error,
23 23 extensions,
24 24 match,
25 25 pycompat,
26 26 scmutil,
27 27 store,
28 28 streamclone,
29 29 util,
30 30 wireprotoserver,
31 31 wireprototypes,
32 32 wireprotov1server,
33 33 )
34 34 from . import (
35 35 constants,
36 36 shallowutil,
37 37 )
38 38
39 39 _sshv1server = wireprotoserver.sshv1protocolhandler
40 40
41 41
42 42 def setupserver(ui, repo):
43 43 """Sets up a normal Mercurial repo so it can serve files to shallow repos."""
44 44 onetimesetup(ui)
45 45
46 46 # don't send files to shallow clients during pulls
47 47 def generatefiles(
48 48 orig, self, changedfiles, linknodes, commonrevs, source, *args, **kwargs
49 49 ):
50 50 caps = self._bundlecaps or []
51 51 if constants.BUNDLE2_CAPABLITY in caps:
52 52 # only send files that don't match the specified patterns
53 53 includepattern = None
54 54 excludepattern = None
55 55 for cap in self._bundlecaps or []:
56 56 if cap.startswith(b"includepattern="):
57 57 includepattern = cap[len(b"includepattern=") :].split(b'\0')
58 58 elif cap.startswith(b"excludepattern="):
59 59 excludepattern = cap[len(b"excludepattern=") :].split(b'\0')
60 60
61 61 m = match.always()
62 62 if includepattern or excludepattern:
63 63 m = match.match(
64 64 repo.root, b'', None, includepattern, excludepattern
65 65 )
66 66
67 67 changedfiles = list([f for f in changedfiles if not m(f)])
68 68 return orig(
69 69 self, changedfiles, linknodes, commonrevs, source, *args, **kwargs
70 70 )
71 71
72 72 extensions.wrapfunction(
73 73 changegroup.cgpacker, b'generatefiles', generatefiles
74 74 )
75 75
76 76
77 77 onetime = False
78 78
79 79
80 80 def onetimesetup(ui):
81 81 """Configures the wireprotocol for both clients and servers."""
82 82 global onetime
83 83 if onetime:
84 84 return
85 85 onetime = True
86 86
87 87 # support file content requests
88 88 wireprotov1server.wireprotocommand(
89 89 b'x_rfl_getflogheads', b'path', permission=b'pull'
90 90 )(getflogheads)
91 91 wireprotov1server.wireprotocommand(
92 92 b'x_rfl_getfiles', b'', permission=b'pull'
93 93 )(getfiles)
94 94 wireprotov1server.wireprotocommand(
95 95 b'x_rfl_getfile', b'file node', permission=b'pull'
96 96 )(getfile)
97 97
98 98 class streamstate(object):
99 99 match = None
100 100 shallowremote = False
101 101 noflatmf = False
102 102
103 103 state = streamstate()
104 104
105 105 def stream_out_shallow(repo, proto, other):
106 106 includepattern = None
107 107 excludepattern = None
108 108 raw = other.get(b'includepattern')
109 109 if raw:
110 110 includepattern = raw.split(b'\0')
111 111 raw = other.get(b'excludepattern')
112 112 if raw:
113 113 excludepattern = raw.split(b'\0')
114 114
115 115 oldshallow = state.shallowremote
116 116 oldmatch = state.match
117 117 oldnoflatmf = state.noflatmf
118 118 try:
119 119 state.shallowremote = True
120 120 state.match = match.always()
121 121 state.noflatmf = other.get(b'noflatmanifest') == b'True'
122 122 if includepattern or excludepattern:
123 123 state.match = match.match(
124 124 repo.root, b'', None, includepattern, excludepattern
125 125 )
126 126 streamres = wireprotov1server.stream(repo, proto)
127 127
128 128 # Force the first value to execute, so the file list is computed
129 129 # within the try/finally scope
130 130 first = next(streamres.gen)
131 131 second = next(streamres.gen)
132 132
133 133 def gen():
134 134 yield first
135 135 yield second
136 136 for value in streamres.gen:
137 137 yield value
138 138
139 139 return wireprototypes.streamres(gen())
140 140 finally:
141 141 state.shallowremote = oldshallow
142 142 state.match = oldmatch
143 143 state.noflatmf = oldnoflatmf
144 144
145 145 wireprotov1server.commands[b'stream_out_shallow'] = (
146 146 stream_out_shallow,
147 147 b'*',
148 148 )
149 149
150 150 # don't clone filelogs to shallow clients
151 151 def _walkstreamfiles(orig, repo, matcher=None):
152 152 if state.shallowremote:
153 153 # if we are shallow ourselves, stream our local commits
154 154 if shallowutil.isenabled(repo):
155 155 striplen = len(repo.store.path) + 1
156 156 readdir = repo.store.rawvfs.readdir
157 157 visit = [os.path.join(repo.store.path, b'data')]
158 158 while visit:
159 159 p = visit.pop()
160 160 for f, kind, st in readdir(p, stat=True):
161 161 fp = p + b'/' + f
162 162 if kind == stat.S_IFREG:
163 163 if not fp.endswith(b'.i') and not fp.endswith(
164 164 b'.d'
165 165 ):
166 166 n = util.pconvert(fp[striplen:])
167 167 d = store.decodedir(n)
168 168 t = store.FILETYPE_OTHER
169 169 yield (t, d, n, st.st_size)
170 170 if kind == stat.S_IFDIR:
171 171 visit.append(fp)
172 172
173 173 if scmutil.istreemanifest(repo):
174 174 for (t, u, e, s) in repo.store.datafiles():
175 175 if u.startswith(b'meta/') and (
176 176 u.endswith(b'.i') or u.endswith(b'.d')
177 177 ):
178 178 yield (t, u, e, s)
179 179
180 180 # Return .d and .i files that do not match the shallow pattern
181 181 match = state.match
182 182 if match and not match.always():
183 183 for (t, u, e, s) in repo.store.datafiles():
184 184 f = u[5:-2] # trim data/... and .i/.d
185 185 if not state.match(f):
186 186 yield (t, u, e, s)
187 187
188 188 for x in repo.store.topfiles():
189 if state.noflatmf and x[0][:11] == b'00manifest.':
189 if state.noflatmf and x[1][:11] == b'00manifest.':
190 190 continue
191 191 yield x
192 192
193 193 elif shallowutil.isenabled(repo):
194 194 # don't allow cloning from a shallow repo to a full repo
195 195 # since it would require fetching every version of every
196 196 # file in order to create the revlogs.
197 197 raise error.Abort(
198 198 _(b"Cannot clone from a shallow repo to a full repo.")
199 199 )
200 200 else:
201 201 for x in orig(repo, matcher):
202 202 yield x
203 203
204 204 extensions.wrapfunction(streamclone, b'_walkstreamfiles', _walkstreamfiles)
205 205
206 206 # expose remotefilelog capabilities
207 207 def _capabilities(orig, repo, proto):
208 208 caps = orig(repo, proto)
209 209 if shallowutil.isenabled(repo) or ui.configbool(
210 210 b'remotefilelog', b'server'
211 211 ):
212 212 if isinstance(proto, _sshv1server):
213 213 # legacy getfiles method which only works over ssh
214 214 caps.append(constants.NETWORK_CAP_LEGACY_SSH_GETFILES)
215 215 caps.append(b'x_rfl_getflogheads')
216 216 caps.append(b'x_rfl_getfile')
217 217 return caps
218 218
219 219 extensions.wrapfunction(wireprotov1server, b'_capabilities', _capabilities)
220 220
221 221 def _adjustlinkrev(orig, self, *args, **kwargs):
222 222 # When generating file blobs, taking the real path is too slow on large
223 223 # repos, so force it to just return the linkrev directly.
224 224 repo = self._repo
225 225 if util.safehasattr(repo, b'forcelinkrev') and repo.forcelinkrev:
226 226 return self._filelog.linkrev(self._filelog.rev(self._filenode))
227 227 return orig(self, *args, **kwargs)
228 228
229 229 extensions.wrapfunction(
230 230 context.basefilectx, b'_adjustlinkrev', _adjustlinkrev
231 231 )
232 232
233 233 def _iscmd(orig, cmd):
234 234 if cmd == b'x_rfl_getfiles':
235 235 return False
236 236 return orig(cmd)
237 237
238 238 extensions.wrapfunction(wireprotoserver, b'iscmd', _iscmd)
239 239
240 240
241 241 def _loadfileblob(repo, cachepath, path, node):
242 242 filecachepath = os.path.join(cachepath, path, hex(node))
243 243 if not os.path.exists(filecachepath) or os.path.getsize(filecachepath) == 0:
244 244 filectx = repo.filectx(path, fileid=node)
245 245 if filectx.node() == repo.nullid:
246 246 repo.changelog = changelog.changelog(repo.svfs)
247 247 filectx = repo.filectx(path, fileid=node)
248 248
249 249 text = createfileblob(filectx)
250 250 # TODO configurable compression engines
251 251 text = zlib.compress(text)
252 252
253 253 # everything should be user & group read/writable
254 254 oldumask = os.umask(0o002)
255 255 try:
256 256 dirname = os.path.dirname(filecachepath)
257 257 if not os.path.exists(dirname):
258 258 try:
259 259 os.makedirs(dirname)
260 260 except OSError as ex:
261 261 if ex.errno != errno.EEXIST:
262 262 raise
263 263
264 264 f = None
265 265 try:
266 266 f = util.atomictempfile(filecachepath, b"wb")
267 267 f.write(text)
268 268 except (IOError, OSError):
269 269 # Don't abort if the user only has permission to read,
270 270 # and not write.
271 271 pass
272 272 finally:
273 273 if f:
274 274 f.close()
275 275 finally:
276 276 os.umask(oldumask)
277 277 else:
278 278 with open(filecachepath, b"rb") as f:
279 279 text = f.read()
280 280 return text
281 281
282 282
283 283 def getflogheads(repo, proto, path):
284 284 """A server api for requesting a filelog's heads"""
285 285 flog = repo.file(path)
286 286 heads = flog.heads()
287 287 return b'\n'.join((hex(head) for head in heads if head != repo.nullid))
288 288
289 289
290 290 def getfile(repo, proto, file, node):
291 291 """A server api for requesting a particular version of a file. Can be used
292 292 in batches to request many files at once. The return protocol is:
293 293 <errorcode>\0<data/errormsg> where <errorcode> is 0 for success or
294 294 non-zero for an error.
295 295
296 296 data is a compressed blob with revlog flag and ancestors information. See
297 297 createfileblob for its content.
298 298 """
299 299 if shallowutil.isenabled(repo):
300 300 return b'1\0' + _(b'cannot fetch remote files from shallow repo')
301 301 cachepath = repo.ui.config(b"remotefilelog", b"servercachepath")
302 302 if not cachepath:
303 303 cachepath = os.path.join(repo.path, b"remotefilelogcache")
304 304 node = bin(node.strip())
305 305 if node == repo.nullid:
306 306 return b'0\0'
307 307 return b'0\0' + _loadfileblob(repo, cachepath, file, node)
308 308
309 309
310 310 def getfiles(repo, proto):
311 311 """A server api for requesting particular versions of particular files."""
312 312 if shallowutil.isenabled(repo):
313 313 raise error.Abort(_(b'cannot fetch remote files from shallow repo'))
314 314 if not isinstance(proto, _sshv1server):
315 315 raise error.Abort(_(b'cannot fetch remote files over non-ssh protocol'))
316 316
317 317 def streamer():
318 318 fin = proto._fin
319 319
320 320 cachepath = repo.ui.config(b"remotefilelog", b"servercachepath")
321 321 if not cachepath:
322 322 cachepath = os.path.join(repo.path, b"remotefilelogcache")
323 323
324 324 while True:
325 325 request = fin.readline()[:-1]
326 326 if not request:
327 327 break
328 328
329 329 node = bin(request[:40])
330 330 if node == repo.nullid:
331 331 yield b'0\n'
332 332 continue
333 333
334 334 path = request[40:]
335 335
336 336 text = _loadfileblob(repo, cachepath, path, node)
337 337
338 338 yield b'%d\n%s' % (len(text), text)
339 339
340 340 # it would be better to only flush after processing a whole batch
341 341 # but currently we don't know if there are more requests coming
342 342 proto._fout.flush()
343 343
344 344 return wireprototypes.streamres(streamer())
345 345
346 346
347 347 def createfileblob(filectx):
348 348 """
349 349 format:
350 350 v0:
351 351 str(len(rawtext)) + '\0' + rawtext + ancestortext
352 352 v1:
353 353 'v1' + '\n' + metalist + '\0' + rawtext + ancestortext
354 354 metalist := metalist + '\n' + meta | meta
355 355 meta := sizemeta | flagmeta
356 356 sizemeta := METAKEYSIZE + str(len(rawtext))
357 357 flagmeta := METAKEYFLAG + str(flag)
358 358
359 359 note: sizemeta must exist. METAKEYFLAG and METAKEYSIZE must have a
360 360 length of 1.
361 361 """
362 362 flog = filectx.filelog()
363 363 frev = filectx.filerev()
364 364 revlogflags = flog._revlog.flags(frev)
365 365 if revlogflags == 0:
366 366 # normal files
367 367 text = filectx.data()
368 368 else:
369 369 # lfs, read raw revision data
370 370 text = flog.rawdata(frev)
371 371
372 372 repo = filectx._repo
373 373
374 374 ancestors = [filectx]
375 375
376 376 try:
377 377 repo.forcelinkrev = True
378 378 ancestors.extend([f for f in filectx.ancestors()])
379 379
380 380 ancestortext = b""
381 381 for ancestorctx in ancestors:
382 382 parents = ancestorctx.parents()
383 383 p1 = repo.nullid
384 384 p2 = repo.nullid
385 385 if len(parents) > 0:
386 386 p1 = parents[0].filenode()
387 387 if len(parents) > 1:
388 388 p2 = parents[1].filenode()
389 389
390 390 copyname = b""
391 391 rename = ancestorctx.renamed()
392 392 if rename:
393 393 copyname = rename[0]
394 394 linknode = ancestorctx.node()
395 395 ancestortext += b"%s%s%s%s%s\0" % (
396 396 ancestorctx.filenode(),
397 397 p1,
398 398 p2,
399 399 linknode,
400 400 copyname,
401 401 )
402 402 finally:
403 403 repo.forcelinkrev = False
404 404
405 405 header = shallowutil.buildfileblobheader(len(text), revlogflags)
406 406
407 407 return b"%s\0%s%s" % (header, text, ancestortext)
408 408
409 409
410 410 def gcserver(ui, repo):
411 411 if not repo.ui.configbool(b"remotefilelog", b"server"):
412 412 return
413 413
414 414 neededfiles = set()
415 415 heads = repo.revs(b"heads(tip~25000:) - null")
416 416
417 417 cachepath = repo.vfs.join(b"remotefilelogcache")
418 418 for head in heads:
419 419 mf = repo[head].manifest()
420 420 for filename, filenode in pycompat.iteritems(mf):
421 421 filecachepath = os.path.join(cachepath, filename, hex(filenode))
422 422 neededfiles.add(filecachepath)
423 423
424 424 # delete unneeded older files
425 425 days = repo.ui.configint(b"remotefilelog", b"serverexpiration")
426 426 expiration = time.time() - (days * 24 * 60 * 60)
427 427
428 428 progress = ui.makeprogress(_(b"removing old server cache"), unit=b"files")
429 429 progress.update(0)
430 430 for root, dirs, files in os.walk(cachepath):
431 431 for file in files:
432 432 filepath = os.path.join(root, file)
433 433 progress.increment()
434 434 if filepath in neededfiles:
435 435 continue
436 436
437 437 stat = os.stat(filepath)
438 438 if stat.st_mtime < expiration:
439 439 os.remove(filepath)
440 440
441 441 progress.complete()
General Comments 0
You need to be logged in to leave comments. Login now