##// END OF EJS Templates
store: use StoreEntry API instead of parsing filename in remotefilelog...
marmoute -
r51378:5805b8b2 default
parent child Browse files
Show More
@@ -1,443 +1,442 b''
1 1 # remotefilelogserver.py - server logic for a remotefilelog server
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import os
9 9 import stat
10 10 import time
11 11 import zlib
12 12
13 13 from mercurial.i18n import _
14 14 from mercurial.node import bin, hex
15 15 from mercurial.pycompat import open
16 16 from mercurial import (
17 17 changegroup,
18 18 changelog,
19 19 context,
20 20 error,
21 21 extensions,
22 22 match,
23 23 scmutil,
24 24 store,
25 25 streamclone,
26 26 util,
27 27 wireprotoserver,
28 28 wireprototypes,
29 29 wireprotov1server,
30 30 )
31 31 from . import (
32 32 constants,
33 33 shallowutil,
34 34 )
35 35
36 36 _sshv1server = wireprotoserver.sshv1protocolhandler
37 37
38 38
39 39 def setupserver(ui, repo):
40 40 """Sets up a normal Mercurial repo so it can serve files to shallow repos."""
41 41 onetimesetup(ui)
42 42
43 43 # don't send files to shallow clients during pulls
44 44 def generatefiles(
45 45 orig, self, changedfiles, linknodes, commonrevs, source, *args, **kwargs
46 46 ):
47 47 caps = self._bundlecaps or []
48 48 if constants.BUNDLE2_CAPABLITY in caps:
49 49 # only send files that don't match the specified patterns
50 50 includepattern = None
51 51 excludepattern = None
52 52 for cap in self._bundlecaps or []:
53 53 if cap.startswith(b"includepattern="):
54 54 includepattern = cap[len(b"includepattern=") :].split(b'\0')
55 55 elif cap.startswith(b"excludepattern="):
56 56 excludepattern = cap[len(b"excludepattern=") :].split(b'\0')
57 57
58 58 m = match.always()
59 59 if includepattern or excludepattern:
60 60 m = match.match(
61 61 repo.root, b'', None, includepattern, excludepattern
62 62 )
63 63
64 64 changedfiles = list([f for f in changedfiles if not m(f)])
65 65 return orig(
66 66 self, changedfiles, linknodes, commonrevs, source, *args, **kwargs
67 67 )
68 68
69 69 extensions.wrapfunction(
70 70 changegroup.cgpacker, b'generatefiles', generatefiles
71 71 )
72 72
73 73
74 74 onetime = False
75 75
76 76
77 77 def onetimesetup(ui):
78 78 """Configures the wireprotocol for both clients and servers."""
79 79 global onetime
80 80 if onetime:
81 81 return
82 82 onetime = True
83 83
84 84 # support file content requests
85 85 wireprotov1server.wireprotocommand(
86 86 b'x_rfl_getflogheads', b'path', permission=b'pull'
87 87 )(getflogheads)
88 88 wireprotov1server.wireprotocommand(
89 89 b'x_rfl_getfiles', b'', permission=b'pull'
90 90 )(getfiles)
91 91 wireprotov1server.wireprotocommand(
92 92 b'x_rfl_getfile', b'file node', permission=b'pull'
93 93 )(getfile)
94 94
95 95 class streamstate:
96 96 match = None
97 97 shallowremote = False
98 98 noflatmf = False
99 99
100 100 state = streamstate()
101 101
102 102 def stream_out_shallow(repo, proto, other):
103 103 includepattern = None
104 104 excludepattern = None
105 105 raw = other.get(b'includepattern')
106 106 if raw:
107 107 includepattern = raw.split(b'\0')
108 108 raw = other.get(b'excludepattern')
109 109 if raw:
110 110 excludepattern = raw.split(b'\0')
111 111
112 112 oldshallow = state.shallowremote
113 113 oldmatch = state.match
114 114 oldnoflatmf = state.noflatmf
115 115 try:
116 116 state.shallowremote = True
117 117 state.match = match.always()
118 118 state.noflatmf = other.get(b'noflatmanifest') == b'True'
119 119 if includepattern or excludepattern:
120 120 state.match = match.match(
121 121 repo.root, b'', None, includepattern, excludepattern
122 122 )
123 123 streamres = wireprotov1server.stream(repo, proto)
124 124
125 125 # Force the first value to execute, so the file list is computed
126 126 # within the try/finally scope
127 127 first = next(streamres.gen)
128 128 second = next(streamres.gen)
129 129
130 130 def gen():
131 131 yield first
132 132 yield second
133 133 for value in streamres.gen:
134 134 yield value
135 135
136 136 return wireprototypes.streamres(gen())
137 137 finally:
138 138 state.shallowremote = oldshallow
139 139 state.match = oldmatch
140 140 state.noflatmf = oldnoflatmf
141 141
142 142 wireprotov1server.commands[b'stream_out_shallow'] = (
143 143 stream_out_shallow,
144 144 b'*',
145 145 )
146 146
147 147 # don't clone filelogs to shallow clients
148 148 def _walkstreamfiles(orig, repo, matcher=None):
149 149 if state.shallowremote:
150 150 # if we are shallow ourselves, stream our local commits
151 151 if shallowutil.isenabled(repo):
152 152 striplen = len(repo.store.path) + 1
153 153 readdir = repo.store.rawvfs.readdir
154 154 visit = [os.path.join(repo.store.path, b'data')]
155 155 while visit:
156 156 p = visit.pop()
157 157 for f, kind, st in readdir(p, stat=True):
158 158 fp = p + b'/' + f
159 159 if kind == stat.S_IFREG:
160 160 if not fp.endswith(b'.i') and not fp.endswith(
161 161 b'.d'
162 162 ):
163 163 n = util.pconvert(fp[striplen:])
164 164 d = store.decodedir(n)
165 165 yield store.SimpleStoreEntry(
166 166 unencoded_path=d,
167 167 is_volatile=False,
168 168 file_size=st.st_size,
169 169 )
170 170
171 171 if kind == stat.S_IFDIR:
172 172 visit.append(fp)
173 173
174 174 if scmutil.istreemanifest(repo):
175 175 for entry in repo.store.datafiles():
176 u = entry.unencoded_path
177 if u.startswith(b'meta/') and (
178 u.endswith(b'.i') or u.endswith(b'.d')
179 ):
176 if not entry.is_revlog:
177 continue
178 if entry.revlog_type == store.FILEFLAGS_MANIFESTLOG:
180 179 yield entry
181 180
182 181 # Return .d and .i files that do not match the shallow pattern
183 182 match = state.match
184 183 if match and not match.always():
185 184 for entry in repo.store.datafiles():
186 u = entry.unencoded_path
187 f = u[5:-2] # trim data/... and .i/.d
188 if not state.match(f):
185 if not entry.is_revlog:
186 continue
187 if not state.match(entry.target_id):
189 188 yield entry
190 189
191 190 for x in repo.store.topfiles():
192 191 if state.noflatmf and x[1][:11] == b'00manifest.':
193 192 continue
194 193 yield x
195 194
196 195 elif shallowutil.isenabled(repo):
197 196 # don't allow cloning from a shallow repo to a full repo
198 197 # since it would require fetching every version of every
199 198 # file in order to create the revlogs.
200 199 raise error.Abort(
201 200 _(b"Cannot clone from a shallow repo to a full repo.")
202 201 )
203 202 else:
204 203 for x in orig(repo, matcher):
205 204 yield x
206 205
207 206 extensions.wrapfunction(streamclone, b'_walkstreamfiles', _walkstreamfiles)
208 207
209 208 # expose remotefilelog capabilities
210 209 def _capabilities(orig, repo, proto):
211 210 caps = orig(repo, proto)
212 211 if shallowutil.isenabled(repo) or ui.configbool(
213 212 b'remotefilelog', b'server'
214 213 ):
215 214 if isinstance(proto, _sshv1server):
216 215 # legacy getfiles method which only works over ssh
217 216 caps.append(constants.NETWORK_CAP_LEGACY_SSH_GETFILES)
218 217 caps.append(b'x_rfl_getflogheads')
219 218 caps.append(b'x_rfl_getfile')
220 219 return caps
221 220
222 221 extensions.wrapfunction(wireprotov1server, b'_capabilities', _capabilities)
223 222
224 223 def _adjustlinkrev(orig, self, *args, **kwargs):
225 224 # When generating file blobs, taking the real path is too slow on large
226 225 # repos, so force it to just return the linkrev directly.
227 226 repo = self._repo
228 227 if util.safehasattr(repo, b'forcelinkrev') and repo.forcelinkrev:
229 228 return self._filelog.linkrev(self._filelog.rev(self._filenode))
230 229 return orig(self, *args, **kwargs)
231 230
232 231 extensions.wrapfunction(
233 232 context.basefilectx, b'_adjustlinkrev', _adjustlinkrev
234 233 )
235 234
236 235 def _iscmd(orig, cmd):
237 236 if cmd == b'x_rfl_getfiles':
238 237 return False
239 238 return orig(cmd)
240 239
241 240 extensions.wrapfunction(wireprotoserver, b'iscmd', _iscmd)
242 241
243 242
244 243 def _loadfileblob(repo, cachepath, path, node):
245 244 filecachepath = os.path.join(cachepath, path, hex(node))
246 245 if not os.path.exists(filecachepath) or os.path.getsize(filecachepath) == 0:
247 246 filectx = repo.filectx(path, fileid=node)
248 247 if filectx.node() == repo.nullid:
249 248 repo.changelog = changelog.changelog(repo.svfs)
250 249 filectx = repo.filectx(path, fileid=node)
251 250
252 251 text = createfileblob(filectx)
253 252 # TODO configurable compression engines
254 253 text = zlib.compress(text)
255 254
256 255 # everything should be user & group read/writable
257 256 oldumask = os.umask(0o002)
258 257 try:
259 258 dirname = os.path.dirname(filecachepath)
260 259 if not os.path.exists(dirname):
261 260 try:
262 261 os.makedirs(dirname)
263 262 except FileExistsError:
264 263 pass
265 264
266 265 f = None
267 266 try:
268 267 f = util.atomictempfile(filecachepath, b"wb")
269 268 f.write(text)
270 269 except (IOError, OSError):
271 270 # Don't abort if the user only has permission to read,
272 271 # and not write.
273 272 pass
274 273 finally:
275 274 if f:
276 275 f.close()
277 276 finally:
278 277 os.umask(oldumask)
279 278 else:
280 279 with open(filecachepath, b"rb") as f:
281 280 text = f.read()
282 281 return text
283 282
284 283
285 284 def getflogheads(repo, proto, path):
286 285 """A server api for requesting a filelog's heads"""
287 286 flog = repo.file(path)
288 287 heads = flog.heads()
289 288 return b'\n'.join((hex(head) for head in heads if head != repo.nullid))
290 289
291 290
292 291 def getfile(repo, proto, file, node):
293 292 """A server api for requesting a particular version of a file. Can be used
294 293 in batches to request many files at once. The return protocol is:
295 294 <errorcode>\0<data/errormsg> where <errorcode> is 0 for success or
296 295 non-zero for an error.
297 296
298 297 data is a compressed blob with revlog flag and ancestors information. See
299 298 createfileblob for its content.
300 299 """
301 300 if shallowutil.isenabled(repo):
302 301 return b'1\0' + _(b'cannot fetch remote files from shallow repo')
303 302 cachepath = repo.ui.config(b"remotefilelog", b"servercachepath")
304 303 if not cachepath:
305 304 cachepath = os.path.join(repo.path, b"remotefilelogcache")
306 305 node = bin(node.strip())
307 306 if node == repo.nullid:
308 307 return b'0\0'
309 308 return b'0\0' + _loadfileblob(repo, cachepath, file, node)
310 309
311 310
312 311 def getfiles(repo, proto):
313 312 """A server api for requesting particular versions of particular files."""
314 313 if shallowutil.isenabled(repo):
315 314 raise error.Abort(_(b'cannot fetch remote files from shallow repo'))
316 315 if not isinstance(proto, _sshv1server):
317 316 raise error.Abort(_(b'cannot fetch remote files over non-ssh protocol'))
318 317
319 318 def streamer():
320 319 fin = proto._fin
321 320
322 321 cachepath = repo.ui.config(b"remotefilelog", b"servercachepath")
323 322 if not cachepath:
324 323 cachepath = os.path.join(repo.path, b"remotefilelogcache")
325 324
326 325 while True:
327 326 request = fin.readline()[:-1]
328 327 if not request:
329 328 break
330 329
331 330 node = bin(request[:40])
332 331 if node == repo.nullid:
333 332 yield b'0\n'
334 333 continue
335 334
336 335 path = request[40:]
337 336
338 337 text = _loadfileblob(repo, cachepath, path, node)
339 338
340 339 yield b'%d\n%s' % (len(text), text)
341 340
342 341 # it would be better to only flush after processing a whole batch
343 342 # but currently we don't know if there are more requests coming
344 343 proto._fout.flush()
345 344
346 345 return wireprototypes.streamres(streamer())
347 346
348 347
349 348 def createfileblob(filectx):
350 349 """
351 350 format:
352 351 v0:
353 352 str(len(rawtext)) + '\0' + rawtext + ancestortext
354 353 v1:
355 354 'v1' + '\n' + metalist + '\0' + rawtext + ancestortext
356 355 metalist := metalist + '\n' + meta | meta
357 356 meta := sizemeta | flagmeta
358 357 sizemeta := METAKEYSIZE + str(len(rawtext))
359 358 flagmeta := METAKEYFLAG + str(flag)
360 359
361 360 note: sizemeta must exist. METAKEYFLAG and METAKEYSIZE must have a
362 361 length of 1.
363 362 """
364 363 flog = filectx.filelog()
365 364 frev = filectx.filerev()
366 365 revlogflags = flog._revlog.flags(frev)
367 366 if revlogflags == 0:
368 367 # normal files
369 368 text = filectx.data()
370 369 else:
371 370 # lfs, read raw revision data
372 371 text = flog.rawdata(frev)
373 372
374 373 repo = filectx._repo
375 374
376 375 ancestors = [filectx]
377 376
378 377 try:
379 378 repo.forcelinkrev = True
380 379 ancestors.extend([f for f in filectx.ancestors()])
381 380
382 381 ancestortext = b""
383 382 for ancestorctx in ancestors:
384 383 parents = ancestorctx.parents()
385 384 p1 = repo.nullid
386 385 p2 = repo.nullid
387 386 if len(parents) > 0:
388 387 p1 = parents[0].filenode()
389 388 if len(parents) > 1:
390 389 p2 = parents[1].filenode()
391 390
392 391 copyname = b""
393 392 rename = ancestorctx.renamed()
394 393 if rename:
395 394 copyname = rename[0]
396 395 linknode = ancestorctx.node()
397 396 ancestortext += b"%s%s%s%s%s\0" % (
398 397 ancestorctx.filenode(),
399 398 p1,
400 399 p2,
401 400 linknode,
402 401 copyname,
403 402 )
404 403 finally:
405 404 repo.forcelinkrev = False
406 405
407 406 header = shallowutil.buildfileblobheader(len(text), revlogflags)
408 407
409 408 return b"%s\0%s%s" % (header, text, ancestortext)
410 409
411 410
412 411 def gcserver(ui, repo):
413 412 if not repo.ui.configbool(b"remotefilelog", b"server"):
414 413 return
415 414
416 415 neededfiles = set()
417 416 heads = repo.revs(b"heads(tip~25000:) - null")
418 417
419 418 cachepath = repo.vfs.join(b"remotefilelogcache")
420 419 for head in heads:
421 420 mf = repo[head].manifest()
422 421 for filename, filenode in mf.items():
423 422 filecachepath = os.path.join(cachepath, filename, hex(filenode))
424 423 neededfiles.add(filecachepath)
425 424
426 425 # delete unneeded older files
427 426 days = repo.ui.configint(b"remotefilelog", b"serverexpiration")
428 427 expiration = time.time() - (days * 24 * 60 * 60)
429 428
430 429 progress = ui.makeprogress(_(b"removing old server cache"), unit=b"files")
431 430 progress.update(0)
432 431 for root, dirs, files in os.walk(cachepath):
433 432 for file in files:
434 433 filepath = os.path.join(root, file)
435 434 progress.increment()
436 435 if filepath in neededfiles:
437 436 continue
438 437
439 438 stat = os.stat(filepath)
440 439 if stat.st_mtime < expiration:
441 440 os.remove(filepath)
442 441
443 442 progress.complete()
General Comments 0
You need to be logged in to leave comments. Login now