##// END OF EJS Templates
remotefilelogserver: remove pack-serving functionality...
Augie Fackler -
r40538:3b900876 default
parent child Browse files
Show More
@@ -1,554 +1,417 b''
1 1 # remotefilelogserver.py - server logic for a remotefilelog server
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 from __future__ import absolute_import
8 8
9 9 import errno
10 10 import os
11 11 import stat
12 12 import time
13 13
14 14 from mercurial.i18n import _
15 from mercurial.node import bin, hex, nullid, nullrev
15 from mercurial.node import bin, hex, nullid
16 16 from mercurial import (
17 ancestor,
18 17 changegroup,
19 18 changelog,
20 19 context,
21 20 error,
22 21 extensions,
23 22 match,
24 pycompat,
25 23 store,
26 24 streamclone,
27 25 util,
28 26 wireprotoserver,
29 27 wireprototypes,
30 28 wireprotov1server,
31 29 )
32 30 from . import (
33 constants,
34 31 lz4wrapper,
35 32 shallowrepo,
36 33 shallowutil,
37 wirepack,
38 34 )
39 35
40 36 _sshv1server = wireprotoserver.sshv1protocolhandler
41 37
42 38 def setupserver(ui, repo):
43 39 """Sets up a normal Mercurial repo so it can serve files to shallow repos.
44 40 """
45 41 onetimesetup(ui)
46 42
47 43 # don't send files to shallow clients during pulls
48 44 def generatefiles(orig, self, changedfiles, linknodes, commonrevs, source,
49 45 *args, **kwargs):
50 46 caps = self._bundlecaps or []
51 47 if shallowrepo.requirement in caps:
52 48 # only send files that don't match the specified patterns
53 49 includepattern = None
54 50 excludepattern = None
55 51 for cap in (self._bundlecaps or []):
56 52 if cap.startswith("includepattern="):
57 53 includepattern = cap[len("includepattern="):].split('\0')
58 54 elif cap.startswith("excludepattern="):
59 55 excludepattern = cap[len("excludepattern="):].split('\0')
60 56
61 57 m = match.always(repo.root, '')
62 58 if includepattern or excludepattern:
63 59 m = match.match(repo.root, '', None,
64 60 includepattern, excludepattern)
65 61
66 62 changedfiles = list([f for f in changedfiles if not m(f)])
67 63 return orig(self, changedfiles, linknodes, commonrevs, source,
68 64 *args, **kwargs)
69 65
70 66 extensions.wrapfunction(
71 67 changegroup.cgpacker, 'generatefiles', generatefiles)
72 68
73 69 onetime = False
74 70 def onetimesetup(ui):
75 71 """Configures the wireprotocol for both clients and servers.
76 72 """
77 73 global onetime
78 74 if onetime:
79 75 return
80 76 onetime = True
81 77
82 78 # support file content requests
83 79 wireprotov1server.wireprotocommand(
84 80 'getflogheads', 'path', permission='pull')(getflogheads)
85 81 wireprotov1server.wireprotocommand(
86 82 'getfiles', '', permission='pull')(getfiles)
87 83 wireprotov1server.wireprotocommand(
88 84 'getfile', 'file node', permission='pull')(getfile)
89 wireprotov1server.wireprotocommand(
90 'getpackv1', '*', permission='pull')(getpack)
91 85
92 86 class streamstate(object):
93 87 match = None
94 88 shallowremote = False
95 89 noflatmf = False
96 90 state = streamstate()
97 91
98 92 def stream_out_shallow(repo, proto, other):
99 93 includepattern = None
100 94 excludepattern = None
101 95 raw = other.get('includepattern')
102 96 if raw:
103 97 includepattern = raw.split('\0')
104 98 raw = other.get('excludepattern')
105 99 if raw:
106 100 excludepattern = raw.split('\0')
107 101
108 102 oldshallow = state.shallowremote
109 103 oldmatch = state.match
110 104 oldnoflatmf = state.noflatmf
111 105 try:
112 106 state.shallowremote = True
113 107 state.match = match.always(repo.root, '')
114 108 state.noflatmf = other.get('noflatmanifest') == 'True'
115 109 if includepattern or excludepattern:
116 110 state.match = match.match(repo.root, '', None,
117 111 includepattern, excludepattern)
118 112 streamres = wireprotov1server.stream(repo, proto)
119 113
120 114 # Force the first value to execute, so the file list is computed
121 115 # within the try/finally scope
122 116 first = next(streamres.gen)
123 117 second = next(streamres.gen)
124 118 def gen():
125 119 yield first
126 120 yield second
127 121 for value in streamres.gen:
128 122 yield value
129 123 return wireprototypes.streamres(gen())
130 124 finally:
131 125 state.shallowremote = oldshallow
132 126 state.match = oldmatch
133 127 state.noflatmf = oldnoflatmf
134 128
135 129 wireprotov1server.commands['stream_out_shallow'] = (stream_out_shallow, '*')
136 130
137 131 # don't clone filelogs to shallow clients
138 132 def _walkstreamfiles(orig, repo):
139 133 if state.shallowremote:
140 134 # if we are shallow ourselves, stream our local commits
141 135 if shallowrepo.requirement in repo.requirements:
142 136 striplen = len(repo.store.path) + 1
143 137 readdir = repo.store.rawvfs.readdir
144 138 visit = [os.path.join(repo.store.path, 'data')]
145 139 while visit:
146 140 p = visit.pop()
147 141 for f, kind, st in readdir(p, stat=True):
148 142 fp = p + '/' + f
149 143 if kind == stat.S_IFREG:
150 144 if not fp.endswith('.i') and not fp.endswith('.d'):
151 145 n = util.pconvert(fp[striplen:])
152 146 yield (store.decodedir(n), n, st.st_size)
153 147 if kind == stat.S_IFDIR:
154 148 visit.append(fp)
155 149
156 150 if 'treemanifest' in repo.requirements:
157 151 for (u, e, s) in repo.store.datafiles():
158 152 if (u.startswith('meta/') and
159 153 (u.endswith('.i') or u.endswith('.d'))):
160 154 yield (u, e, s)
161 155
162 156 # Return .d and .i files that do not match the shallow pattern
163 157 match = state.match
164 158 if match and not match.always():
165 159 for (u, e, s) in repo.store.datafiles():
166 160 f = u[5:-2] # trim data/... and .i/.d
167 161 if not state.match(f):
168 162 yield (u, e, s)
169 163
170 164 for x in repo.store.topfiles():
171 165 if state.noflatmf and x[0][:11] == '00manifest.':
172 166 continue
173 167 yield x
174 168
175 169 elif shallowrepo.requirement in repo.requirements:
176 170 # don't allow cloning from a shallow repo to a full repo
177 171 # since it would require fetching every version of every
178 172 # file in order to create the revlogs.
179 173 raise error.Abort(_("Cannot clone from a shallow repo "
180 174 "to a full repo."))
181 175 else:
182 176 for x in orig(repo):
183 177 yield x
184 178
185 179 extensions.wrapfunction(streamclone, '_walkstreamfiles', _walkstreamfiles)
186 180
187 181 # We no longer use getbundle_shallow commands, but we must still
188 182 # support it for migration purposes
189 183 def getbundleshallow(repo, proto, others):
190 184 bundlecaps = others.get('bundlecaps', '')
191 185 bundlecaps = set(bundlecaps.split(','))
192 186 bundlecaps.add('remotefilelog')
193 187 others['bundlecaps'] = ','.join(bundlecaps)
194 188
195 189 return wireprotov1server.commands["getbundle"][0](repo, proto, others)
196 190
197 191 wireprotov1server.commands["getbundle_shallow"] = (getbundleshallow, '*')
198 192
199 193 # expose remotefilelog capabilities
200 194 def _capabilities(orig, repo, proto):
201 195 caps = orig(repo, proto)
202 196 if ((shallowrepo.requirement in repo.requirements or
203 197 ui.configbool('remotefilelog', 'server'))):
204 198 if isinstance(proto, _sshv1server):
205 199 # legacy getfiles method which only works over ssh
206 200 caps.append(shallowrepo.requirement)
207 201 caps.append('getflogheads')
208 202 caps.append('getfile')
209 203 return caps
210 204 extensions.wrapfunction(wireprotov1server, '_capabilities', _capabilities)
211 205
212 206 def _adjustlinkrev(orig, self, *args, **kwargs):
213 207 # When generating file blobs, taking the real path is too slow on large
214 208 # repos, so force it to just return the linkrev directly.
215 209 repo = self._repo
216 210 if util.safehasattr(repo, 'forcelinkrev') and repo.forcelinkrev:
217 211 return self._filelog.linkrev(self._filelog.rev(self._filenode))
218 212 return orig(self, *args, **kwargs)
219 213
220 214 extensions.wrapfunction(
221 215 context.basefilectx, '_adjustlinkrev', _adjustlinkrev)
222 216
223 217 def _iscmd(orig, cmd):
224 218 if cmd == 'getfiles':
225 219 return False
226 220 return orig(cmd)
227 221
228 222 extensions.wrapfunction(wireprotoserver, 'iscmd', _iscmd)
229 223
230 224 def _loadfileblob(repo, cachepath, path, node):
231 225 filecachepath = os.path.join(cachepath, path, hex(node))
232 226 if not os.path.exists(filecachepath) or os.path.getsize(filecachepath) == 0:
233 227 filectx = repo.filectx(path, fileid=node)
234 228 if filectx.node() == nullid:
235 229 repo.changelog = changelog.changelog(repo.svfs)
236 230 filectx = repo.filectx(path, fileid=node)
237 231
238 232 text = createfileblob(filectx)
239 233 text = lz4wrapper.lzcompresshc(text)
240 234
241 235 # everything should be user & group read/writable
242 236 oldumask = os.umask(0o002)
243 237 try:
244 238 dirname = os.path.dirname(filecachepath)
245 239 if not os.path.exists(dirname):
246 240 try:
247 241 os.makedirs(dirname)
248 242 except OSError as ex:
249 243 if ex.errno != errno.EEXIST:
250 244 raise
251 245
252 246 f = None
253 247 try:
254 248 f = util.atomictempfile(filecachepath, "w")
255 249 f.write(text)
256 250 except (IOError, OSError):
257 251 # Don't abort if the user only has permission to read,
258 252 # and not write.
259 253 pass
260 254 finally:
261 255 if f:
262 256 f.close()
263 257 finally:
264 258 os.umask(oldumask)
265 259 else:
266 260 with open(filecachepath, "r") as f:
267 261 text = f.read()
268 262 return text
269 263
270 264 def getflogheads(repo, proto, path):
271 265 """A server api for requesting a filelog's heads
272 266 """
273 267 flog = repo.file(path)
274 268 heads = flog.heads()
275 269 return '\n'.join((hex(head) for head in heads if head != nullid))
276 270
277 271 def getfile(repo, proto, file, node):
278 272 """A server api for requesting a particular version of a file. Can be used
279 273 in batches to request many files at once. The return protocol is:
280 274 <errorcode>\0<data/errormsg> where <errorcode> is 0 for success or
281 275 non-zero for an error.
282 276
283 277 data is a compressed blob with revlog flag and ancestors information. See
284 278 createfileblob for its content.
285 279 """
286 280 if shallowrepo.requirement in repo.requirements:
287 281 return '1\0' + _('cannot fetch remote files from shallow repo')
288 282 cachepath = repo.ui.config("remotefilelog", "servercachepath")
289 283 if not cachepath:
290 284 cachepath = os.path.join(repo.path, "remotefilelogcache")
291 285 node = bin(node.strip())
292 286 if node == nullid:
293 287 return '0\0'
294 288 return '0\0' + _loadfileblob(repo, cachepath, file, node)
295 289
296 290 def getfiles(repo, proto):
297 291 """A server api for requesting particular versions of particular files.
298 292 """
299 293 if shallowrepo.requirement in repo.requirements:
300 294 raise error.Abort(_('cannot fetch remote files from shallow repo'))
301 295 if not isinstance(proto, _sshv1server):
302 296 raise error.Abort(_('cannot fetch remote files over non-ssh protocol'))
303 297
304 298 def streamer():
305 299 fin = proto._fin
306 300
307 301 cachepath = repo.ui.config("remotefilelog", "servercachepath")
308 302 if not cachepath:
309 303 cachepath = os.path.join(repo.path, "remotefilelogcache")
310 304
311 305 while True:
312 306 request = fin.readline()[:-1]
313 307 if not request:
314 308 break
315 309
316 310 node = bin(request[:40])
317 311 if node == nullid:
318 312 yield '0\n'
319 313 continue
320 314
321 315 path = request[40:]
322 316
323 317 text = _loadfileblob(repo, cachepath, path, node)
324 318
325 319 yield '%d\n%s' % (len(text), text)
326 320
327 321 # it would be better to only flush after processing a whole batch
328 322 # but currently we don't know if there are more requests coming
329 323 proto._fout.flush()
330 324 return wireprototypes.streamres(streamer())
331 325
332 326 def createfileblob(filectx):
333 327 """
334 328 format:
335 329 v0:
336 330 str(len(rawtext)) + '\0' + rawtext + ancestortext
337 331 v1:
338 332 'v1' + '\n' + metalist + '\0' + rawtext + ancestortext
339 333 metalist := metalist + '\n' + meta | meta
340 334 meta := sizemeta | flagmeta
341 335 sizemeta := METAKEYSIZE + str(len(rawtext))
342 336 flagmeta := METAKEYFLAG + str(flag)
343 337
344 338 note: sizemeta must exist. METAKEYFLAG and METAKEYSIZE must have a
345 339 length of 1.
346 340 """
347 341 flog = filectx.filelog()
348 342 frev = filectx.filerev()
349 343 revlogflags = flog._revlog.flags(frev)
350 344 if revlogflags == 0:
351 345 # normal files
352 346 text = filectx.data()
353 347 else:
354 348 # lfs, read raw revision data
355 349 text = flog.revision(frev, raw=True)
356 350
357 351 repo = filectx._repo
358 352
359 353 ancestors = [filectx]
360 354
361 355 try:
362 356 repo.forcelinkrev = True
363 357 ancestors.extend([f for f in filectx.ancestors()])
364 358
365 359 ancestortext = ""
366 360 for ancestorctx in ancestors:
367 361 parents = ancestorctx.parents()
368 362 p1 = nullid
369 363 p2 = nullid
370 364 if len(parents) > 0:
371 365 p1 = parents[0].filenode()
372 366 if len(parents) > 1:
373 367 p2 = parents[1].filenode()
374 368
375 369 copyname = ""
376 370 rename = ancestorctx.renamed()
377 371 if rename:
378 372 copyname = rename[0]
379 373 linknode = ancestorctx.node()
380 374 ancestortext += "%s%s%s%s%s\0" % (
381 375 ancestorctx.filenode(), p1, p2, linknode,
382 376 copyname)
383 377 finally:
384 378 repo.forcelinkrev = False
385 379
386 380 header = shallowutil.buildfileblobheader(len(text), revlogflags)
387 381
388 382 return "%s\0%s%s" % (header, text, ancestortext)
389 383
390 384 def gcserver(ui, repo):
391 385 if not repo.ui.configbool("remotefilelog", "server"):
392 386 return
393 387
394 388 neededfiles = set()
395 389 heads = repo.revs("heads(tip~25000:) - null")
396 390
397 391 cachepath = repo.vfs.join("remotefilelogcache")
398 392 for head in heads:
399 393 mf = repo[head].manifest()
400 394 for filename, filenode in mf.iteritems():
401 395 filecachepath = os.path.join(cachepath, filename, hex(filenode))
402 396 neededfiles.add(filecachepath)
403 397
404 398 # delete unneeded older files
405 399 days = repo.ui.configint("remotefilelog", "serverexpiration")
406 400 expiration = time.time() - (days * 24 * 60 * 60)
407 401
408 402 _removing = _("removing old server cache")
409 403 count = 0
410 404 ui.progress(_removing, count, unit="files")
411 405 for root, dirs, files in os.walk(cachepath):
412 406 for file in files:
413 407 filepath = os.path.join(root, file)
414 408 count += 1
415 409 ui.progress(_removing, count, unit="files")
416 410 if filepath in neededfiles:
417 411 continue
418 412
419 413 stat = os.stat(filepath)
420 414 if stat.st_mtime < expiration:
421 415 os.remove(filepath)
422 416
423 417 ui.progress(_removing, None)
424
425 def getpack(repo, proto, args):
426 """A server api for requesting a pack of file information.
427 """
428 if shallowrepo.requirement in repo.requirements:
429 raise error.Abort(_('cannot fetch remote files from shallow repo'))
430 if not isinstance(proto, _sshv1server):
431 raise error.Abort(_('cannot fetch remote files over non-ssh protocol'))
432
433 def streamer():
434 """Request format:
435
436 [<filerequest>,...]\0\0
437 filerequest = <filename len: 2 byte><filename><count: 4 byte>
438 [<node: 20 byte>,...]
439
440 Response format:
441 [<fileresponse>,...]<10 null bytes>
442 fileresponse = <filename len: 2 byte><filename><history><deltas>
443 history = <count: 4 byte>[<history entry>,...]
444 historyentry = <node: 20 byte><p1: 20 byte><p2: 20 byte>
445 <linknode: 20 byte><copyfrom len: 2 byte><copyfrom>
446 deltas = <count: 4 byte>[<delta entry>,...]
447 deltaentry = <node: 20 byte><deltabase: 20 byte>
448 <delta len: 8 byte><delta>
449 """
450 fin = proto._fin
451 files = _receivepackrequest(fin)
452
453 # Sort the files by name, so we provide deterministic results
454 for filename, nodes in sorted(files.iteritems()):
455 fl = repo.file(filename)
456
457 # Compute history
458 history = []
459 for rev in ancestor.lazyancestors(fl.parentrevs,
460 [fl.rev(n) for n in nodes],
461 inclusive=True):
462 linkrev = fl.linkrev(rev)
463 node = fl.node(rev)
464 p1node, p2node = fl.parents(node)
465 copyfrom = ''
466 linknode = repo.changelog.node(linkrev)
467 if p1node == nullid:
468 copydata = fl.renamed(node)
469 if copydata:
470 copyfrom, copynode = copydata
471 p1node = copynode
472
473 history.append((node, p1node, p2node, linknode, copyfrom))
474
475 # Scan and send deltas
476 chain = _getdeltachain(fl, nodes, -1)
477
478 for chunk in wirepack.sendpackpart(filename, history, chain):
479 yield chunk
480
481 yield wirepack.closepart()
482 proto._fout.flush()
483
484 return wireprototypes.streamres(streamer())
485
486 def _receivepackrequest(stream):
487 files = {}
488 while True:
489 filenamelen = shallowutil.readunpack(stream,
490 constants.FILENAMESTRUCT)[0]
491 if filenamelen == 0:
492 break
493
494 filename = shallowutil.readexactly(stream, filenamelen)
495
496 nodecount = shallowutil.readunpack(stream,
497 constants.PACKREQUESTCOUNTSTRUCT)[0]
498
499 # Read N nodes
500 nodes = shallowutil.readexactly(stream, constants.NODESIZE * nodecount)
501 nodes = set(nodes[i:i + constants.NODESIZE] for i in
502 pycompat.xrange(0, len(nodes), constants.NODESIZE))
503
504 files[filename] = nodes
505
506 return files
507
508 def _getdeltachain(fl, nodes, stophint):
509 """Produces a chain of deltas that includes each of the given nodes.
510
511 `stophint` - The changeset rev number to stop at. If it's set to >= 0, we
512 will return not only the deltas for the requested nodes, but also all
513 necessary deltas in their delta chains, as long as the deltas have link revs
514 >= the stophint. This allows us to return an approximately minimal delta
515 chain when the user performs a pull. If `stophint` is set to -1, all nodes
516 will return full texts. """
517 chain = []
518
519 seen = set()
520 for node in nodes:
521 startrev = fl.rev(node)
522 cur = startrev
523 while True:
524 if cur in seen:
525 break
526 base = fl._revlog.deltaparent(cur)
527 linkrev = fl.linkrev(cur)
528 node = fl.node(cur)
529 p1, p2 = fl.parentrevs(cur)
530 if linkrev < stophint and cur != startrev:
531 break
532
533 # Return a full text if:
534 # - the caller requested it (via stophint == -1)
535 # - the revlog chain has ended (via base==null or base==node)
536 # - p1 is null. In some situations this can mean it's a copy, so
537 # we need to use fl.read() to remove the copymetadata.
538 if (stophint == -1 or base == nullrev or base == cur
539 or p1 == nullrev):
540 delta = fl.read(cur)
541 base = nullrev
542 else:
543 delta = fl._chunk(cur)
544
545 basenode = fl.node(base)
546 chain.append((node, basenode, delta))
547 seen.add(cur)
548
549 if base == nullrev:
550 break
551 cur = base
552
553 chain.reverse()
554 return chain
General Comments 0
You need to be logged in to leave comments. Login now