##// END OF EJS Templates
store: make `walk` return an entry for phase if requested so...
marmoute -
r51405:a32d739b default
parent child Browse files
Show More
@@ -1,442 +1,442 b''
1 1 # remotefilelogserver.py - server logic for a remotefilelog server
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import os
9 9 import stat
10 10 import time
11 11 import zlib
12 12
13 13 from mercurial.i18n import _
14 14 from mercurial.node import bin, hex
15 15 from mercurial.pycompat import open
16 16 from mercurial import (
17 17 changegroup,
18 18 changelog,
19 19 context,
20 20 error,
21 21 extensions,
22 22 match,
23 23 scmutil,
24 24 store,
25 25 streamclone,
26 26 util,
27 27 wireprotoserver,
28 28 wireprototypes,
29 29 wireprotov1server,
30 30 )
31 31 from . import (
32 32 constants,
33 33 shallowutil,
34 34 )
35 35
36 36 _sshv1server = wireprotoserver.sshv1protocolhandler
37 37
38 38
39 39 def setupserver(ui, repo):
40 40 """Sets up a normal Mercurial repo so it can serve files to shallow repos."""
41 41 onetimesetup(ui)
42 42
43 43 # don't send files to shallow clients during pulls
44 44 def generatefiles(
45 45 orig, self, changedfiles, linknodes, commonrevs, source, *args, **kwargs
46 46 ):
47 47 caps = self._bundlecaps or []
48 48 if constants.BUNDLE2_CAPABLITY in caps:
49 49 # only send files that don't match the specified patterns
50 50 includepattern = None
51 51 excludepattern = None
52 52 for cap in self._bundlecaps or []:
53 53 if cap.startswith(b"includepattern="):
54 54 includepattern = cap[len(b"includepattern=") :].split(b'\0')
55 55 elif cap.startswith(b"excludepattern="):
56 56 excludepattern = cap[len(b"excludepattern=") :].split(b'\0')
57 57
58 58 m = match.always()
59 59 if includepattern or excludepattern:
60 60 m = match.match(
61 61 repo.root, b'', None, includepattern, excludepattern
62 62 )
63 63
64 64 changedfiles = list([f for f in changedfiles if not m(f)])
65 65 return orig(
66 66 self, changedfiles, linknodes, commonrevs, source, *args, **kwargs
67 67 )
68 68
69 69 extensions.wrapfunction(
70 70 changegroup.cgpacker, b'generatefiles', generatefiles
71 71 )
72 72
73 73
74 74 onetime = False
75 75
76 76
77 77 def onetimesetup(ui):
78 78 """Configures the wireprotocol for both clients and servers."""
79 79 global onetime
80 80 if onetime:
81 81 return
82 82 onetime = True
83 83
84 84 # support file content requests
85 85 wireprotov1server.wireprotocommand(
86 86 b'x_rfl_getflogheads', b'path', permission=b'pull'
87 87 )(getflogheads)
88 88 wireprotov1server.wireprotocommand(
89 89 b'x_rfl_getfiles', b'', permission=b'pull'
90 90 )(getfiles)
91 91 wireprotov1server.wireprotocommand(
92 92 b'x_rfl_getfile', b'file node', permission=b'pull'
93 93 )(getfile)
94 94
95 95 class streamstate:
96 96 match = None
97 97 shallowremote = False
98 98 noflatmf = False
99 99
100 100 state = streamstate()
101 101
102 102 def stream_out_shallow(repo, proto, other):
103 103 includepattern = None
104 104 excludepattern = None
105 105 raw = other.get(b'includepattern')
106 106 if raw:
107 107 includepattern = raw.split(b'\0')
108 108 raw = other.get(b'excludepattern')
109 109 if raw:
110 110 excludepattern = raw.split(b'\0')
111 111
112 112 oldshallow = state.shallowremote
113 113 oldmatch = state.match
114 114 oldnoflatmf = state.noflatmf
115 115 try:
116 116 state.shallowremote = True
117 117 state.match = match.always()
118 118 state.noflatmf = other.get(b'noflatmanifest') == b'True'
119 119 if includepattern or excludepattern:
120 120 state.match = match.match(
121 121 repo.root, b'', None, includepattern, excludepattern
122 122 )
123 123 streamres = wireprotov1server.stream(repo, proto)
124 124
125 125 # Force the first value to execute, so the file list is computed
126 126 # within the try/finally scope
127 127 first = next(streamres.gen)
128 128 second = next(streamres.gen)
129 129
130 130 def gen():
131 131 yield first
132 132 yield second
133 133 for value in streamres.gen:
134 134 yield value
135 135
136 136 return wireprototypes.streamres(gen())
137 137 finally:
138 138 state.shallowremote = oldshallow
139 139 state.match = oldmatch
140 140 state.noflatmf = oldnoflatmf
141 141
142 142 wireprotov1server.commands[b'stream_out_shallow'] = (
143 143 stream_out_shallow,
144 144 b'*',
145 145 )
146 146
147 147 # don't clone filelogs to shallow clients
148 def _walkstreamfiles(orig, repo, matcher=None):
148 def _walkstreamfiles(orig, repo, matcher=None, phase=False):
149 149 if state.shallowremote:
150 150 # if we are shallow ourselves, stream our local commits
151 151 if shallowutil.isenabled(repo):
152 152 striplen = len(repo.store.path) + 1
153 153 readdir = repo.store.rawvfs.readdir
154 154 visit = [os.path.join(repo.store.path, b'data')]
155 155 while visit:
156 156 p = visit.pop()
157 157 for f, kind, st in readdir(p, stat=True):
158 158 fp = p + b'/' + f
159 159 if kind == stat.S_IFREG:
160 160 if not fp.endswith(b'.i') and not fp.endswith(
161 161 b'.d'
162 162 ):
163 163 n = util.pconvert(fp[striplen:])
164 164 d = store.decodedir(n)
165 165 yield store.SimpleStoreEntry(
166 166 entry_path=d,
167 167 is_volatile=False,
168 168 file_size=st.st_size,
169 169 )
170 170
171 171 if kind == stat.S_IFDIR:
172 172 visit.append(fp)
173 173
174 174 if scmutil.istreemanifest(repo):
175 175 for entry in repo.store.data_entries():
176 176 if not entry.is_revlog:
177 177 continue
178 178 if entry.is_manifestlog:
179 179 yield entry
180 180
181 181 # Return .d and .i files that do not match the shallow pattern
182 182 match = state.match
183 183 if match and not match.always():
184 184 for entry in repo.store.data_entries():
185 185 if not entry.is_revlog:
186 186 continue
187 187 if not state.match(entry.target_id):
188 188 yield entry
189 189
190 190 for x in repo.store.top_entries():
191 191 if state.noflatmf and x[1][:11] == b'00manifest.':
192 192 continue
193 193 yield x
194 194
195 195 elif shallowutil.isenabled(repo):
196 196 # don't allow cloning from a shallow repo to a full repo
197 197 # since it would require fetching every version of every
198 198 # file in order to create the revlogs.
199 199 raise error.Abort(
200 200 _(b"Cannot clone from a shallow repo to a full repo.")
201 201 )
202 202 else:
203 for x in orig(repo, matcher):
203 for x in orig(repo, matcher, phase=phase):
204 204 yield x
205 205
206 206 extensions.wrapfunction(streamclone, b'_walkstreamfiles', _walkstreamfiles)
207 207
208 208 # expose remotefilelog capabilities
209 209 def _capabilities(orig, repo, proto):
210 210 caps = orig(repo, proto)
211 211 if shallowutil.isenabled(repo) or ui.configbool(
212 212 b'remotefilelog', b'server'
213 213 ):
214 214 if isinstance(proto, _sshv1server):
215 215 # legacy getfiles method which only works over ssh
216 216 caps.append(constants.NETWORK_CAP_LEGACY_SSH_GETFILES)
217 217 caps.append(b'x_rfl_getflogheads')
218 218 caps.append(b'x_rfl_getfile')
219 219 return caps
220 220
221 221 extensions.wrapfunction(wireprotov1server, b'_capabilities', _capabilities)
222 222
223 223 def _adjustlinkrev(orig, self, *args, **kwargs):
224 224 # When generating file blobs, taking the real path is too slow on large
225 225 # repos, so force it to just return the linkrev directly.
226 226 repo = self._repo
227 227 if util.safehasattr(repo, b'forcelinkrev') and repo.forcelinkrev:
228 228 return self._filelog.linkrev(self._filelog.rev(self._filenode))
229 229 return orig(self, *args, **kwargs)
230 230
231 231 extensions.wrapfunction(
232 232 context.basefilectx, b'_adjustlinkrev', _adjustlinkrev
233 233 )
234 234
235 235 def _iscmd(orig, cmd):
236 236 if cmd == b'x_rfl_getfiles':
237 237 return False
238 238 return orig(cmd)
239 239
240 240 extensions.wrapfunction(wireprotoserver, b'iscmd', _iscmd)
241 241
242 242
243 243 def _loadfileblob(repo, cachepath, path, node):
244 244 filecachepath = os.path.join(cachepath, path, hex(node))
245 245 if not os.path.exists(filecachepath) or os.path.getsize(filecachepath) == 0:
246 246 filectx = repo.filectx(path, fileid=node)
247 247 if filectx.node() == repo.nullid:
248 248 repo.changelog = changelog.changelog(repo.svfs)
249 249 filectx = repo.filectx(path, fileid=node)
250 250
251 251 text = createfileblob(filectx)
252 252 # TODO configurable compression engines
253 253 text = zlib.compress(text)
254 254
255 255 # everything should be user & group read/writable
256 256 oldumask = os.umask(0o002)
257 257 try:
258 258 dirname = os.path.dirname(filecachepath)
259 259 if not os.path.exists(dirname):
260 260 try:
261 261 os.makedirs(dirname)
262 262 except FileExistsError:
263 263 pass
264 264
265 265 f = None
266 266 try:
267 267 f = util.atomictempfile(filecachepath, b"wb")
268 268 f.write(text)
269 269 except (IOError, OSError):
270 270 # Don't abort if the user only has permission to read,
271 271 # and not write.
272 272 pass
273 273 finally:
274 274 if f:
275 275 f.close()
276 276 finally:
277 277 os.umask(oldumask)
278 278 else:
279 279 with open(filecachepath, b"rb") as f:
280 280 text = f.read()
281 281 return text
282 282
283 283
284 284 def getflogheads(repo, proto, path):
285 285 """A server api for requesting a filelog's heads"""
286 286 flog = repo.file(path)
287 287 heads = flog.heads()
288 288 return b'\n'.join((hex(head) for head in heads if head != repo.nullid))
289 289
290 290
291 291 def getfile(repo, proto, file, node):
292 292 """A server api for requesting a particular version of a file. Can be used
293 293 in batches to request many files at once. The return protocol is:
294 294 <errorcode>\0<data/errormsg> where <errorcode> is 0 for success or
295 295 non-zero for an error.
296 296
297 297 data is a compressed blob with revlog flag and ancestors information. See
298 298 createfileblob for its content.
299 299 """
300 300 if shallowutil.isenabled(repo):
301 301 return b'1\0' + _(b'cannot fetch remote files from shallow repo')
302 302 cachepath = repo.ui.config(b"remotefilelog", b"servercachepath")
303 303 if not cachepath:
304 304 cachepath = os.path.join(repo.path, b"remotefilelogcache")
305 305 node = bin(node.strip())
306 306 if node == repo.nullid:
307 307 return b'0\0'
308 308 return b'0\0' + _loadfileblob(repo, cachepath, file, node)
309 309
310 310
311 311 def getfiles(repo, proto):
312 312 """A server api for requesting particular versions of particular files."""
313 313 if shallowutil.isenabled(repo):
314 314 raise error.Abort(_(b'cannot fetch remote files from shallow repo'))
315 315 if not isinstance(proto, _sshv1server):
316 316 raise error.Abort(_(b'cannot fetch remote files over non-ssh protocol'))
317 317
318 318 def streamer():
319 319 fin = proto._fin
320 320
321 321 cachepath = repo.ui.config(b"remotefilelog", b"servercachepath")
322 322 if not cachepath:
323 323 cachepath = os.path.join(repo.path, b"remotefilelogcache")
324 324
325 325 while True:
326 326 request = fin.readline()[:-1]
327 327 if not request:
328 328 break
329 329
330 330 node = bin(request[:40])
331 331 if node == repo.nullid:
332 332 yield b'0\n'
333 333 continue
334 334
335 335 path = request[40:]
336 336
337 337 text = _loadfileblob(repo, cachepath, path, node)
338 338
339 339 yield b'%d\n%s' % (len(text), text)
340 340
341 341 # it would be better to only flush after processing a whole batch
342 342 # but currently we don't know if there are more requests coming
343 343 proto._fout.flush()
344 344
345 345 return wireprototypes.streamres(streamer())
346 346
347 347
348 348 def createfileblob(filectx):
349 349 """
350 350 format:
351 351 v0:
352 352 str(len(rawtext)) + '\0' + rawtext + ancestortext
353 353 v1:
354 354 'v1' + '\n' + metalist + '\0' + rawtext + ancestortext
355 355 metalist := metalist + '\n' + meta | meta
356 356 meta := sizemeta | flagmeta
357 357 sizemeta := METAKEYSIZE + str(len(rawtext))
358 358 flagmeta := METAKEYFLAG + str(flag)
359 359
360 360 note: sizemeta must exist. METAKEYFLAG and METAKEYSIZE must have a
361 361 length of 1.
362 362 """
363 363 flog = filectx.filelog()
364 364 frev = filectx.filerev()
365 365 revlogflags = flog._revlog.flags(frev)
366 366 if revlogflags == 0:
367 367 # normal files
368 368 text = filectx.data()
369 369 else:
370 370 # lfs, read raw revision data
371 371 text = flog.rawdata(frev)
372 372
373 373 repo = filectx._repo
374 374
375 375 ancestors = [filectx]
376 376
377 377 try:
378 378 repo.forcelinkrev = True
379 379 ancestors.extend([f for f in filectx.ancestors()])
380 380
381 381 ancestortext = b""
382 382 for ancestorctx in ancestors:
383 383 parents = ancestorctx.parents()
384 384 p1 = repo.nullid
385 385 p2 = repo.nullid
386 386 if len(parents) > 0:
387 387 p1 = parents[0].filenode()
388 388 if len(parents) > 1:
389 389 p2 = parents[1].filenode()
390 390
391 391 copyname = b""
392 392 rename = ancestorctx.renamed()
393 393 if rename:
394 394 copyname = rename[0]
395 395 linknode = ancestorctx.node()
396 396 ancestortext += b"%s%s%s%s%s\0" % (
397 397 ancestorctx.filenode(),
398 398 p1,
399 399 p2,
400 400 linknode,
401 401 copyname,
402 402 )
403 403 finally:
404 404 repo.forcelinkrev = False
405 405
406 406 header = shallowutil.buildfileblobheader(len(text), revlogflags)
407 407
408 408 return b"%s\0%s%s" % (header, text, ancestortext)
409 409
410 410
411 411 def gcserver(ui, repo):
412 412 if not repo.ui.configbool(b"remotefilelog", b"server"):
413 413 return
414 414
415 415 neededfiles = set()
416 416 heads = repo.revs(b"heads(tip~25000:) - null")
417 417
418 418 cachepath = repo.vfs.join(b"remotefilelogcache")
419 419 for head in heads:
420 420 mf = repo[head].manifest()
421 421 for filename, filenode in mf.items():
422 422 filecachepath = os.path.join(cachepath, filename, hex(filenode))
423 423 neededfiles.add(filecachepath)
424 424
425 425 # delete unneeded older files
426 426 days = repo.ui.configint(b"remotefilelog", b"serverexpiration")
427 427 expiration = time.time() - (days * 24 * 60 * 60)
428 428
429 429 progress = ui.makeprogress(_(b"removing old server cache"), unit=b"files")
430 430 progress.update(0)
431 431 for root, dirs, files in os.walk(cachepath):
432 432 for file in files:
433 433 filepath = os.path.join(root, file)
434 434 progress.increment()
435 435 if filepath in neededfiles:
436 436 continue
437 437
438 438 stat = os.stat(filepath)
439 439 if stat.st_mtime < expiration:
440 440 os.remove(filepath)
441 441
442 442 progress.complete()
@@ -1,1081 +1,1088 b''
1 1 # store.py - repository store handling for Mercurial
2 2 #
3 3 # Copyright 2008 Olivia Mackall <olivia@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import collections
9 9 import functools
10 10 import os
11 11 import re
12 12 import stat
13 13 from typing import Generator
14 14
15 15 from .i18n import _
16 16 from .pycompat import getattr
17 17 from .thirdparty import attr
18 18 from .node import hex
19 19 from . import (
20 20 changelog,
21 21 error,
22 22 manifest,
23 23 policy,
24 24 pycompat,
25 25 util,
26 26 vfs as vfsmod,
27 27 )
28 28 from .utils import hashutil
29 29
30 30 parsers = policy.importmod('parsers')
31 31 # how much bytes should be read from fncache in one read
32 32 # It is done to prevent loading large fncache files into memory
33 33 fncache_chunksize = 10 ** 6
34 34
35 35
36 36 def _match_tracked_entry(entry, matcher):
37 37 """parses a fncache entry and returns whether the entry is tracking a path
38 38 matched by matcher or not.
39 39
40 40 If matcher is None, returns True"""
41 41
42 42 if matcher is None:
43 43 return True
44 44 if entry.is_filelog:
45 45 return matcher(entry.target_id)
46 46 elif entry.is_manifestlog:
47 47 return matcher.visitdir(entry.target_id.rstrip(b'/'))
48 48 raise error.ProgrammingError(b"cannot process entry %r" % entry)
49 49
50 50
51 51 # This avoids a collision between a file named foo and a dir named
52 52 # foo.i or foo.d
53 53 def _encodedir(path):
54 54 """
55 55 >>> _encodedir(b'data/foo.i')
56 56 'data/foo.i'
57 57 >>> _encodedir(b'data/foo.i/bla.i')
58 58 'data/foo.i.hg/bla.i'
59 59 >>> _encodedir(b'data/foo.i.hg/bla.i')
60 60 'data/foo.i.hg.hg/bla.i'
61 61 >>> _encodedir(b'data/foo.i\\ndata/foo.i/bla.i\\ndata/foo.i.hg/bla.i\\n')
62 62 'data/foo.i\\ndata/foo.i.hg/bla.i\\ndata/foo.i.hg.hg/bla.i\\n'
63 63 """
64 64 return (
65 65 path.replace(b".hg/", b".hg.hg/")
66 66 .replace(b".i/", b".i.hg/")
67 67 .replace(b".d/", b".d.hg/")
68 68 )
69 69
70 70
71 71 encodedir = getattr(parsers, 'encodedir', _encodedir)
72 72
73 73
74 74 def decodedir(path):
75 75 """
76 76 >>> decodedir(b'data/foo.i')
77 77 'data/foo.i'
78 78 >>> decodedir(b'data/foo.i.hg/bla.i')
79 79 'data/foo.i/bla.i'
80 80 >>> decodedir(b'data/foo.i.hg.hg/bla.i')
81 81 'data/foo.i.hg/bla.i'
82 82 """
83 83 if b".hg/" not in path:
84 84 return path
85 85 return (
86 86 path.replace(b".d.hg/", b".d/")
87 87 .replace(b".i.hg/", b".i/")
88 88 .replace(b".hg.hg/", b".hg/")
89 89 )
90 90
91 91
92 92 def _reserved():
93 93 """characters that are problematic for filesystems
94 94
95 95 * ascii escapes (0..31)
96 96 * ascii hi (126..255)
97 97 * windows specials
98 98
99 99 these characters will be escaped by encodefunctions
100 100 """
101 101 winreserved = [ord(x) for x in u'\\:*?"<>|']
102 102 for x in range(32):
103 103 yield x
104 104 for x in range(126, 256):
105 105 yield x
106 106 for x in winreserved:
107 107 yield x
108 108
109 109
110 110 def _buildencodefun():
111 111 """
112 112 >>> enc, dec = _buildencodefun()
113 113
114 114 >>> enc(b'nothing/special.txt')
115 115 'nothing/special.txt'
116 116 >>> dec(b'nothing/special.txt')
117 117 'nothing/special.txt'
118 118
119 119 >>> enc(b'HELLO')
120 120 '_h_e_l_l_o'
121 121 >>> dec(b'_h_e_l_l_o')
122 122 'HELLO'
123 123
124 124 >>> enc(b'hello:world?')
125 125 'hello~3aworld~3f'
126 126 >>> dec(b'hello~3aworld~3f')
127 127 'hello:world?'
128 128
129 129 >>> enc(b'the\\x07quick\\xADshot')
130 130 'the~07quick~adshot'
131 131 >>> dec(b'the~07quick~adshot')
132 132 'the\\x07quick\\xadshot'
133 133 """
134 134 e = b'_'
135 135 xchr = pycompat.bytechr
136 136 asciistr = list(map(xchr, range(127)))
137 137 capitals = list(range(ord(b"A"), ord(b"Z") + 1))
138 138
139 139 cmap = {x: x for x in asciistr}
140 140 for x in _reserved():
141 141 cmap[xchr(x)] = b"~%02x" % x
142 142 for x in capitals + [ord(e)]:
143 143 cmap[xchr(x)] = e + xchr(x).lower()
144 144
145 145 dmap = {}
146 146 for k, v in cmap.items():
147 147 dmap[v] = k
148 148
149 149 def decode(s):
150 150 i = 0
151 151 while i < len(s):
152 152 for l in range(1, 4):
153 153 try:
154 154 yield dmap[s[i : i + l]]
155 155 i += l
156 156 break
157 157 except KeyError:
158 158 pass
159 159 else:
160 160 raise KeyError
161 161
162 162 return (
163 163 lambda s: b''.join([cmap[s[c : c + 1]] for c in range(len(s))]),
164 164 lambda s: b''.join(list(decode(s))),
165 165 )
166 166
167 167
168 168 _encodefname, _decodefname = _buildencodefun()
169 169
170 170
171 171 def encodefilename(s):
172 172 """
173 173 >>> encodefilename(b'foo.i/bar.d/bla.hg/hi:world?/HELLO')
174 174 'foo.i.hg/bar.d.hg/bla.hg.hg/hi~3aworld~3f/_h_e_l_l_o'
175 175 """
176 176 return _encodefname(encodedir(s))
177 177
178 178
179 179 def decodefilename(s):
180 180 """
181 181 >>> decodefilename(b'foo.i.hg/bar.d.hg/bla.hg.hg/hi~3aworld~3f/_h_e_l_l_o')
182 182 'foo.i/bar.d/bla.hg/hi:world?/HELLO'
183 183 """
184 184 return decodedir(_decodefname(s))
185 185
186 186
187 187 def _buildlowerencodefun():
188 188 """
189 189 >>> f = _buildlowerencodefun()
190 190 >>> f(b'nothing/special.txt')
191 191 'nothing/special.txt'
192 192 >>> f(b'HELLO')
193 193 'hello'
194 194 >>> f(b'hello:world?')
195 195 'hello~3aworld~3f'
196 196 >>> f(b'the\\x07quick\\xADshot')
197 197 'the~07quick~adshot'
198 198 """
199 199 xchr = pycompat.bytechr
200 200 cmap = {xchr(x): xchr(x) for x in range(127)}
201 201 for x in _reserved():
202 202 cmap[xchr(x)] = b"~%02x" % x
203 203 for x in range(ord(b"A"), ord(b"Z") + 1):
204 204 cmap[xchr(x)] = xchr(x).lower()
205 205
206 206 def lowerencode(s):
207 207 return b"".join([cmap[c] for c in pycompat.iterbytestr(s)])
208 208
209 209 return lowerencode
210 210
211 211
212 212 lowerencode = getattr(parsers, 'lowerencode', None) or _buildlowerencodefun()
213 213
214 214 # Windows reserved names: con, prn, aux, nul, com1..com9, lpt1..lpt9
215 215 _winres3 = (b'aux', b'con', b'prn', b'nul') # length 3
216 216 _winres4 = (b'com', b'lpt') # length 4 (with trailing 1..9)
217 217
218 218
219 219 def _auxencode(path, dotencode):
220 220 """
221 221 Encodes filenames containing names reserved by Windows or which end in
222 222 period or space. Does not touch other single reserved characters c.
223 223 Specifically, c in '\\:*?"<>|' or ord(c) <= 31 are *not* encoded here.
224 224 Additionally encodes space or period at the beginning, if dotencode is
225 225 True. Parameter path is assumed to be all lowercase.
226 226 A segment only needs encoding if a reserved name appears as a
227 227 basename (e.g. "aux", "aux.foo"). A directory or file named "foo.aux"
228 228 doesn't need encoding.
229 229
230 230 >>> s = b'.foo/aux.txt/txt.aux/con/prn/nul/foo.'
231 231 >>> _auxencode(s.split(b'/'), True)
232 232 ['~2efoo', 'au~78.txt', 'txt.aux', 'co~6e', 'pr~6e', 'nu~6c', 'foo~2e']
233 233 >>> s = b'.com1com2/lpt9.lpt4.lpt1/conprn/com0/lpt0/foo.'
234 234 >>> _auxencode(s.split(b'/'), False)
235 235 ['.com1com2', 'lp~749.lpt4.lpt1', 'conprn', 'com0', 'lpt0', 'foo~2e']
236 236 >>> _auxencode([b'foo. '], True)
237 237 ['foo.~20']
238 238 >>> _auxencode([b' .foo'], True)
239 239 ['~20.foo']
240 240 """
241 241 for i, n in enumerate(path):
242 242 if not n:
243 243 continue
244 244 if dotencode and n[0] in b'. ':
245 245 n = b"~%02x" % ord(n[0:1]) + n[1:]
246 246 path[i] = n
247 247 else:
248 248 l = n.find(b'.')
249 249 if l == -1:
250 250 l = len(n)
251 251 if (l == 3 and n[:3] in _winres3) or (
252 252 l == 4
253 253 and n[3:4] <= b'9'
254 254 and n[3:4] >= b'1'
255 255 and n[:3] in _winres4
256 256 ):
257 257 # encode third letter ('aux' -> 'au~78')
258 258 ec = b"~%02x" % ord(n[2:3])
259 259 n = n[0:2] + ec + n[3:]
260 260 path[i] = n
261 261 if n[-1] in b'. ':
262 262 # encode last period or space ('foo...' -> 'foo..~2e')
263 263 path[i] = n[:-1] + b"~%02x" % ord(n[-1:])
264 264 return path
265 265
266 266
267 267 _maxstorepathlen = 120
268 268 _dirprefixlen = 8
269 269 _maxshortdirslen = 8 * (_dirprefixlen + 1) - 4
270 270
271 271
272 272 def _hashencode(path, dotencode):
273 273 digest = hex(hashutil.sha1(path).digest())
274 274 le = lowerencode(path[5:]).split(b'/') # skips prefix 'data/' or 'meta/'
275 275 parts = _auxencode(le, dotencode)
276 276 basename = parts[-1]
277 277 _root, ext = os.path.splitext(basename)
278 278 sdirs = []
279 279 sdirslen = 0
280 280 for p in parts[:-1]:
281 281 d = p[:_dirprefixlen]
282 282 if d[-1] in b'. ':
283 283 # Windows can't access dirs ending in period or space
284 284 d = d[:-1] + b'_'
285 285 if sdirslen == 0:
286 286 t = len(d)
287 287 else:
288 288 t = sdirslen + 1 + len(d)
289 289 if t > _maxshortdirslen:
290 290 break
291 291 sdirs.append(d)
292 292 sdirslen = t
293 293 dirs = b'/'.join(sdirs)
294 294 if len(dirs) > 0:
295 295 dirs += b'/'
296 296 res = b'dh/' + dirs + digest + ext
297 297 spaceleft = _maxstorepathlen - len(res)
298 298 if spaceleft > 0:
299 299 filler = basename[:spaceleft]
300 300 res = b'dh/' + dirs + filler + digest + ext
301 301 return res
302 302
303 303
304 304 def _hybridencode(path, dotencode):
305 305 """encodes path with a length limit
306 306
307 307 Encodes all paths that begin with 'data/', according to the following.
308 308
309 309 Default encoding (reversible):
310 310
311 311 Encodes all uppercase letters 'X' as '_x'. All reserved or illegal
312 312 characters are encoded as '~xx', where xx is the two digit hex code
313 313 of the character (see encodefilename).
314 314 Relevant path components consisting of Windows reserved filenames are
315 315 masked by encoding the third character ('aux' -> 'au~78', see _auxencode).
316 316
317 317 Hashed encoding (not reversible):
318 318
319 319 If the default-encoded path is longer than _maxstorepathlen, a
320 320 non-reversible hybrid hashing of the path is done instead.
321 321 This encoding uses up to _dirprefixlen characters of all directory
322 322 levels of the lowerencoded path, but not more levels than can fit into
323 323 _maxshortdirslen.
324 324 Then follows the filler followed by the sha digest of the full path.
325 325 The filler is the beginning of the basename of the lowerencoded path
326 326 (the basename is everything after the last path separator). The filler
327 327 is as long as possible, filling in characters from the basename until
328 328 the encoded path has _maxstorepathlen characters (or all chars of the
329 329 basename have been taken).
330 330 The extension (e.g. '.i' or '.d') is preserved.
331 331
332 332 The string 'data/' at the beginning is replaced with 'dh/', if the hashed
333 333 encoding was used.
334 334 """
335 335 path = encodedir(path)
336 336 ef = _encodefname(path).split(b'/')
337 337 res = b'/'.join(_auxencode(ef, dotencode))
338 338 if len(res) > _maxstorepathlen:
339 339 res = _hashencode(path, dotencode)
340 340 return res
341 341
342 342
343 343 def _pathencode(path):
344 344 de = encodedir(path)
345 345 if len(path) > _maxstorepathlen:
346 346 return _hashencode(de, True)
347 347 ef = _encodefname(de).split(b'/')
348 348 res = b'/'.join(_auxencode(ef, True))
349 349 if len(res) > _maxstorepathlen:
350 350 return _hashencode(de, True)
351 351 return res
352 352
353 353
354 354 _pathencode = getattr(parsers, 'pathencode', _pathencode)
355 355
356 356
357 357 def _plainhybridencode(f):
358 358 return _hybridencode(f, False)
359 359
360 360
361 361 def _calcmode(vfs):
362 362 try:
363 363 # files in .hg/ will be created using this mode
364 364 mode = vfs.stat().st_mode
365 365 # avoid some useless chmods
366 366 if (0o777 & ~util.umask) == (0o777 & mode):
367 367 mode = None
368 368 except OSError:
369 369 mode = None
370 370 return mode
371 371
372 372
373 373 _data = [
374 374 b'bookmarks',
375 375 b'narrowspec',
376 376 b'data',
377 377 b'meta',
378 378 b'00manifest.d',
379 379 b'00manifest.i',
380 380 b'00changelog.d',
381 381 b'00changelog.i',
382 382 b'phaseroots',
383 383 b'obsstore',
384 384 b'requires',
385 385 ]
386 386
387 387 REVLOG_FILES_MAIN_EXT = (b'.i',)
388 388 REVLOG_FILES_OTHER_EXT = (
389 389 b'.idx',
390 390 b'.d',
391 391 b'.dat',
392 392 b'.n',
393 393 b'.nd',
394 394 b'.sda',
395 395 )
396 396 # file extension that also use a `-SOMELONGIDHASH.ext` form
397 397 REVLOG_FILES_LONG_EXT = (
398 398 b'.nd',
399 399 b'.idx',
400 400 b'.dat',
401 401 b'.sda',
402 402 )
403 403 # files that are "volatile" and might change between listing and streaming
404 404 #
405 405 # note: the ".nd" file are nodemap data and won't "change" but they might be
406 406 # deleted.
407 407 REVLOG_FILES_VOLATILE_EXT = (b'.n', b'.nd')
408 408
409 409 # some exception to the above matching
410 410 #
411 411 # XXX This is currently not in use because of issue6542
412 412 EXCLUDED = re.compile(br'.*undo\.[^/]+\.(nd?|i)$')
413 413
414 414
415 415 def is_revlog(f, kind, st):
416 416 if kind != stat.S_IFREG:
417 417 return None
418 418 return revlog_type(f)
419 419
420 420
421 421 def revlog_type(f):
422 422 # XXX we need to filter `undo.` created by the transaction here, however
423 423 # being naive about it also filter revlog for `undo.*` files, leading to
424 424 # issue6542. So we no longer use EXCLUDED.
425 425 if f.endswith(REVLOG_FILES_MAIN_EXT):
426 426 return FILEFLAGS_REVLOG_MAIN
427 427 elif f.endswith(REVLOG_FILES_OTHER_EXT):
428 428 t = FILETYPE_FILELOG_OTHER
429 429 if f.endswith(REVLOG_FILES_VOLATILE_EXT):
430 430 t |= FILEFLAGS_VOLATILE
431 431 return t
432 432 return None
433 433
434 434
435 435 # the file is part of changelog data
436 436 FILEFLAGS_CHANGELOG = 1 << 13
437 437 # the file is part of manifest data
438 438 FILEFLAGS_MANIFESTLOG = 1 << 12
439 439 # the file is part of filelog data
440 440 FILEFLAGS_FILELOG = 1 << 11
441 441 # file that are not directly part of a revlog
442 442 FILEFLAGS_OTHER = 1 << 10
443 443
444 444 # the main entry point for a revlog
445 445 FILEFLAGS_REVLOG_MAIN = 1 << 1
446 446 # a secondary file for a revlog
447 447 FILEFLAGS_REVLOG_OTHER = 1 << 0
448 448
449 449 # files that are "volatile" and might change between listing and streaming
450 450 FILEFLAGS_VOLATILE = 1 << 20
451 451
452 452 FILETYPE_CHANGELOG_MAIN = FILEFLAGS_CHANGELOG | FILEFLAGS_REVLOG_MAIN
453 453 FILETYPE_CHANGELOG_OTHER = FILEFLAGS_CHANGELOG | FILEFLAGS_REVLOG_OTHER
454 454 FILETYPE_MANIFESTLOG_MAIN = FILEFLAGS_MANIFESTLOG | FILEFLAGS_REVLOG_MAIN
455 455 FILETYPE_MANIFESTLOG_OTHER = FILEFLAGS_MANIFESTLOG | FILEFLAGS_REVLOG_OTHER
456 456 FILETYPE_FILELOG_MAIN = FILEFLAGS_FILELOG | FILEFLAGS_REVLOG_MAIN
457 457 FILETYPE_FILELOG_OTHER = FILEFLAGS_FILELOG | FILEFLAGS_REVLOG_OTHER
458 458 FILETYPE_OTHER = FILEFLAGS_OTHER
459 459
460 460
461 461 @attr.s(slots=True, init=False)
462 462 class BaseStoreEntry:
463 463 """An entry in the store
464 464
465 465 This is returned by `store.walk` and represent some data in the store."""
466 466
467 467
468 468 @attr.s(slots=True, init=False)
469 469 class SimpleStoreEntry(BaseStoreEntry):
470 470 """A generic entry in the store"""
471 471
472 472 is_revlog = False
473 473
474 474 _entry_path = attr.ib()
475 475 _is_volatile = attr.ib(default=False)
476 476 _file_size = attr.ib(default=None)
477 477
478 478 def __init__(
479 479 self,
480 480 entry_path,
481 481 is_volatile=False,
482 482 file_size=None,
483 483 ):
484 484 super().__init__()
485 485 self._entry_path = entry_path
486 486 self._is_volatile = is_volatile
487 487 self._file_size = file_size
488 488
489 489 def files(self):
490 490 return [
491 491 StoreFile(
492 492 unencoded_path=self._entry_path,
493 493 file_size=self._file_size,
494 494 is_volatile=self._is_volatile,
495 495 )
496 496 ]
497 497
498 498
499 499 @attr.s(slots=True, init=False)
500 500 class RevlogStoreEntry(BaseStoreEntry):
501 501 """A revlog entry in the store"""
502 502
503 503 is_revlog = True
504 504
505 505 revlog_type = attr.ib(default=None)
506 506 target_id = attr.ib(default=None)
507 507 _path_prefix = attr.ib(default=None)
508 508 _details = attr.ib(default=None)
509 509
510 510 def __init__(
511 511 self,
512 512 revlog_type,
513 513 path_prefix,
514 514 target_id,
515 515 details,
516 516 ):
517 517 super().__init__()
518 518 self.revlog_type = revlog_type
519 519 self.target_id = target_id
520 520 self._path_prefix = path_prefix
521 521 assert b'.i' in details, (path_prefix, details)
522 522 self._details = details
523 523
524 524 @property
525 525 def is_changelog(self):
526 526 return self.revlog_type & FILEFLAGS_CHANGELOG
527 527
528 528 @property
529 529 def is_manifestlog(self):
530 530 return self.revlog_type & FILEFLAGS_MANIFESTLOG
531 531
532 532 @property
533 533 def is_filelog(self):
534 534 return self.revlog_type & FILEFLAGS_FILELOG
535 535
536 536 def main_file_path(self):
537 537 """unencoded path of the main revlog file"""
538 538 return self._path_prefix + b'.i'
539 539
540 540 def files(self):
541 541 files = []
542 542 for ext in sorted(self._details, key=_ext_key):
543 543 path = self._path_prefix + ext
544 544 data = self._details[ext]
545 545 files.append(StoreFile(unencoded_path=path, **data))
546 546 return files
547 547
548 548
549 549 @attr.s(slots=True)
550 550 class StoreFile:
551 551 """a file matching an entry"""
552 552
553 553 unencoded_path = attr.ib()
554 554 _file_size = attr.ib(default=None)
555 555 is_volatile = attr.ib(default=False)
556 556
557 557 def file_size(self, vfs):
558 558 if self._file_size is not None:
559 559 return self._file_size
560 560 try:
561 561 return vfs.stat(self.unencoded_path).st_size
562 562 except FileNotFoundError:
563 563 return 0
564 564
565 565
566 566 def _gather_revlog(files_data):
567 567 """group files per revlog prefix
568 568
569 569 The returns a two level nested dict. The top level key is the revlog prefix
570 570 without extension, the second level is all the file "suffix" that were
571 571 seen for this revlog and arbitrary file data as value.
572 572 """
573 573 revlogs = collections.defaultdict(dict)
574 574 for u, value in files_data:
575 575 name, ext = _split_revlog_ext(u)
576 576 revlogs[name][ext] = value
577 577 return sorted(revlogs.items())
578 578
579 579
580 580 def _split_revlog_ext(filename):
581 581 """split the revlog file prefix from the variable extension"""
582 582 if filename.endswith(REVLOG_FILES_LONG_EXT):
583 583 char = b'-'
584 584 else:
585 585 char = b'.'
586 586 idx = filename.rfind(char)
587 587 return filename[:idx], filename[idx:]
588 588
589 589
590 590 def _ext_key(ext):
591 591 """a key to order revlog suffix
592 592
593 593 important to issue .i after other entry."""
594 594 # the only important part of this order is to keep the `.i` last.
595 595 if ext.endswith(b'.n'):
596 596 return (0, ext)
597 597 elif ext.endswith(b'.nd'):
598 598 return (10, ext)
599 599 elif ext.endswith(b'.d'):
600 600 return (20, ext)
601 601 elif ext.endswith(b'.i'):
602 602 return (50, ext)
603 603 else:
604 604 return (40, ext)
605 605
606 606
607 607 class basicstore:
608 608 '''base class for local repository stores'''
609 609
610 610 def __init__(self, path, vfstype):
611 611 vfs = vfstype(path)
612 612 self.path = vfs.base
613 613 self.createmode = _calcmode(vfs)
614 614 vfs.createmode = self.createmode
615 615 self.rawvfs = vfs
616 616 self.vfs = vfsmod.filtervfs(vfs, encodedir)
617 617 self.opener = self.vfs
618 618
619 619 def join(self, f):
620 620 return self.path + b'/' + encodedir(f)
621 621
622 622 def _walk(self, relpath, recurse, undecodable=None):
623 623 '''yields (revlog_type, unencoded, size)'''
624 624 path = self.path
625 625 if relpath:
626 626 path += b'/' + relpath
627 627 striplen = len(self.path) + 1
628 628 l = []
629 629 if self.rawvfs.isdir(path):
630 630 visit = [path]
631 631 readdir = self.rawvfs.readdir
632 632 while visit:
633 633 p = visit.pop()
634 634 for f, kind, st in readdir(p, stat=True):
635 635 fp = p + b'/' + f
636 636 rl_type = is_revlog(f, kind, st)
637 637 if rl_type is not None:
638 638 n = util.pconvert(fp[striplen:])
639 639 l.append((decodedir(n), (rl_type, st.st_size)))
640 640 elif kind == stat.S_IFDIR and recurse:
641 641 visit.append(fp)
642 642
643 643 l.sort()
644 644 return l
645 645
646 646 def changelog(self, trypending, concurrencychecker=None):
647 647 return changelog.changelog(
648 648 self.vfs,
649 649 trypending=trypending,
650 650 concurrencychecker=concurrencychecker,
651 651 )
652 652
653 653 def manifestlog(self, repo, storenarrowmatch):
654 654 rootstore = manifest.manifestrevlog(repo.nodeconstants, self.vfs)
655 655 return manifest.manifestlog(self.vfs, repo, rootstore, storenarrowmatch)
656 656
657 657 def data_entries(
658 658 self, matcher=None, undecodable=None
659 659 ) -> Generator[BaseStoreEntry, None, None]:
660 660 """Like walk, but excluding the changelog and root manifest.
661 661
662 662 When [undecodable] is None, revlogs names that can't be
663 663 decoded cause an exception. When it is provided, it should
664 664 be a list and the filenames that can't be decoded are added
665 665 to it instead. This is very rarely needed."""
666 666 dirs = [
667 667 (b'data', FILEFLAGS_FILELOG),
668 668 (b'meta', FILEFLAGS_MANIFESTLOG),
669 669 ]
670 670 for base_dir, rl_type in dirs:
671 671 files = self._walk(base_dir, True, undecodable=undecodable)
672 672 files = (f for f in files if f[1][0] is not None)
673 673 for revlog, details in _gather_revlog(files):
674 674 file_details = {}
675 675 revlog_target_id = revlog.split(b'/', 1)[1]
676 676 for ext, (t, s) in sorted(details.items()):
677 677 file_details[ext] = {
678 678 'is_volatile': bool(t & FILEFLAGS_VOLATILE),
679 679 'file_size': s,
680 680 }
681 681 yield RevlogStoreEntry(
682 682 path_prefix=revlog,
683 683 revlog_type=rl_type,
684 684 target_id=revlog_target_id,
685 685 details=file_details,
686 686 )
687 687
688 def top_entries(self) -> Generator[BaseStoreEntry, None, None]:
688 def top_entries(self, phase=False) -> Generator[BaseStoreEntry, None, None]:
689 689 files = reversed(self._walk(b'', False))
690 690
691 691 changelogs = collections.defaultdict(dict)
692 692 manifestlogs = collections.defaultdict(dict)
693 693
694 694 for u, (t, s) in files:
695 695 if u.startswith(b'00changelog'):
696 696 name, ext = _split_revlog_ext(u)
697 697 changelogs[name][ext] = (t, s)
698 698 elif u.startswith(b'00manifest'):
699 699 name, ext = _split_revlog_ext(u)
700 700 manifestlogs[name][ext] = (t, s)
701 701 else:
702 702 yield SimpleStoreEntry(
703 703 entry_path=u,
704 704 is_volatile=bool(t & FILEFLAGS_VOLATILE),
705 705 file_size=s,
706 706 )
707 707 # yield manifest before changelog
708 708 top_rl = [
709 709 (manifestlogs, FILEFLAGS_MANIFESTLOG),
710 710 (changelogs, FILEFLAGS_CHANGELOG),
711 711 ]
712 712 assert len(manifestlogs) <= 1
713 713 assert len(changelogs) <= 1
714 714 for data, revlog_type in top_rl:
715 715 for revlog, details in sorted(data.items()):
716 716 file_details = {}
717 717 for ext, (t, s) in details.items():
718 718 file_details[ext] = {
719 719 'is_volatile': bool(t & FILEFLAGS_VOLATILE),
720 720 'file_size': s,
721 721 }
722 722 yield RevlogStoreEntry(
723 723 path_prefix=revlog,
724 724 revlog_type=revlog_type,
725 725 target_id=b'',
726 726 details=file_details,
727 727 )
728 if phase and self.vfs.exists(b'phaseroots'):
729 yield SimpleStoreEntry(
730 entry_path=b'phaseroots',
731 is_volatile=True,
732 )
728 733
729 def walk(self, matcher=None) -> Generator[BaseStoreEntry, None, None]:
734 def walk(
735 self, matcher=None, phase=False
736 ) -> Generator[BaseStoreEntry, None, None]:
730 737 """return files related to data storage (ie: revlogs)
731 738
732 yields (file_type, unencoded, size)
739 yields instance from BaseStoreEntry subclasses
733 740
734 741 if a matcher is passed, storage files of only those tracked paths
735 742 are passed with matches the matcher
736 743 """
737 744 # yield data files first
738 745 for x in self.data_entries(matcher):
739 746 yield x
740 for x in self.top_entries():
747 for x in self.top_entries(phase=phase):
741 748 yield x
742 749
743 750 def copylist(self):
744 751 return _data
745 752
746 753 def write(self, tr):
747 754 pass
748 755
749 756 def invalidatecaches(self):
750 757 pass
751 758
752 759 def markremoved(self, fn):
753 760 pass
754 761
755 762 def __contains__(self, path):
756 763 '''Checks if the store contains path'''
757 764 path = b"/".join((b"data", path))
758 765 # file?
759 766 if self.vfs.exists(path + b".i"):
760 767 return True
761 768 # dir?
762 769 if not path.endswith(b"/"):
763 770 path = path + b"/"
764 771 return self.vfs.exists(path)
765 772
766 773
767 774 class encodedstore(basicstore):
768 775 def __init__(self, path, vfstype):
769 776 vfs = vfstype(path + b'/store')
770 777 self.path = vfs.base
771 778 self.createmode = _calcmode(vfs)
772 779 vfs.createmode = self.createmode
773 780 self.rawvfs = vfs
774 781 self.vfs = vfsmod.filtervfs(vfs, encodefilename)
775 782 self.opener = self.vfs
776 783
777 784 def _walk(self, relpath, recurse, undecodable=None):
778 785 old = super()._walk(relpath, recurse)
779 786 new = []
780 787 for f1, value in old:
781 788 try:
782 789 f2 = decodefilename(f1)
783 790 except KeyError:
784 791 if undecodable is None:
785 792 msg = _(b'undecodable revlog name %s') % f1
786 793 raise error.StorageError(msg)
787 794 else:
788 795 undecodable.append(f1)
789 796 continue
790 797 new.append((f2, value))
791 798 return new
792 799
793 800 def data_entries(
794 801 self, matcher=None, undecodable=None
795 802 ) -> Generator[BaseStoreEntry, None, None]:
796 803 entries = super(encodedstore, self).data_entries(
797 804 undecodable=undecodable
798 805 )
799 806 for entry in entries:
800 807 if _match_tracked_entry(entry, matcher):
801 808 yield entry
802 809
803 810 def join(self, f):
804 811 return self.path + b'/' + encodefilename(f)
805 812
806 813 def copylist(self):
807 814 return [b'requires', b'00changelog.i'] + [b'store/' + f for f in _data]
808 815
809 816
810 817 class fncache:
811 818 # the filename used to be partially encoded
812 819 # hence the encodedir/decodedir dance
813 820 def __init__(self, vfs):
814 821 self.vfs = vfs
815 822 self._ignores = set()
816 823 self.entries = None
817 824 self._dirty = False
818 825 # set of new additions to fncache
819 826 self.addls = set()
820 827
821 828 def ensureloaded(self, warn=None):
822 829 """read the fncache file if not already read.
823 830
824 831 If the file on disk is corrupted, raise. If warn is provided,
825 832 warn and keep going instead."""
826 833 if self.entries is None:
827 834 self._load(warn)
828 835
829 836 def _load(self, warn=None):
830 837 '''fill the entries from the fncache file'''
831 838 self._dirty = False
832 839 try:
833 840 fp = self.vfs(b'fncache', mode=b'rb')
834 841 except IOError:
835 842 # skip nonexistent file
836 843 self.entries = set()
837 844 return
838 845
839 846 self.entries = set()
840 847 chunk = b''
841 848 for c in iter(functools.partial(fp.read, fncache_chunksize), b''):
842 849 chunk += c
843 850 try:
844 851 p = chunk.rindex(b'\n')
845 852 self.entries.update(decodedir(chunk[: p + 1]).splitlines())
846 853 chunk = chunk[p + 1 :]
847 854 except ValueError:
848 855 # substring '\n' not found, maybe the entry is bigger than the
849 856 # chunksize, so let's keep iterating
850 857 pass
851 858
852 859 if chunk:
853 860 msg = _(b"fncache does not ends with a newline")
854 861 if warn:
855 862 warn(msg + b'\n')
856 863 else:
857 864 raise error.Abort(
858 865 msg,
859 866 hint=_(
860 867 b"use 'hg debugrebuildfncache' to "
861 868 b"rebuild the fncache"
862 869 ),
863 870 )
864 871 self._checkentries(fp, warn)
865 872 fp.close()
866 873
867 874 def _checkentries(self, fp, warn):
868 875 """make sure there is no empty string in entries"""
869 876 if b'' in self.entries:
870 877 fp.seek(0)
871 878 for n, line in enumerate(fp):
872 879 if not line.rstrip(b'\n'):
873 880 t = _(b'invalid entry in fncache, line %d') % (n + 1)
874 881 if warn:
875 882 warn(t + b'\n')
876 883 else:
877 884 raise error.Abort(t)
878 885
879 886 def write(self, tr):
880 887 if self._dirty:
881 888 assert self.entries is not None
882 889 self.entries = self.entries | self.addls
883 890 self.addls = set()
884 891 tr.addbackup(b'fncache')
885 892 fp = self.vfs(b'fncache', mode=b'wb', atomictemp=True)
886 893 if self.entries:
887 894 fp.write(encodedir(b'\n'.join(self.entries) + b'\n'))
888 895 fp.close()
889 896 self._dirty = False
890 897 if self.addls:
891 898 # if we have just new entries, let's append them to the fncache
892 899 tr.addbackup(b'fncache')
893 900 fp = self.vfs(b'fncache', mode=b'ab', atomictemp=True)
894 901 if self.addls:
895 902 fp.write(encodedir(b'\n'.join(self.addls) + b'\n'))
896 903 fp.close()
897 904 self.entries = None
898 905 self.addls = set()
899 906
900 907 def addignore(self, fn):
901 908 self._ignores.add(fn)
902 909
903 910 def add(self, fn):
904 911 if fn in self._ignores:
905 912 return
906 913 if self.entries is None:
907 914 self._load()
908 915 if fn not in self.entries:
909 916 self.addls.add(fn)
910 917
911 918 def remove(self, fn):
912 919 if self.entries is None:
913 920 self._load()
914 921 if fn in self.addls:
915 922 self.addls.remove(fn)
916 923 return
917 924 try:
918 925 self.entries.remove(fn)
919 926 self._dirty = True
920 927 except KeyError:
921 928 pass
922 929
923 930 def __contains__(self, fn):
924 931 if fn in self.addls:
925 932 return True
926 933 if self.entries is None:
927 934 self._load()
928 935 return fn in self.entries
929 936
930 937 def __iter__(self):
931 938 if self.entries is None:
932 939 self._load()
933 940 return iter(self.entries | self.addls)
934 941
935 942
936 943 class _fncachevfs(vfsmod.proxyvfs):
937 944 def __init__(self, vfs, fnc, encode):
938 945 vfsmod.proxyvfs.__init__(self, vfs)
939 946 self.fncache = fnc
940 947 self.encode = encode
941 948
942 949 def __call__(self, path, mode=b'r', *args, **kw):
943 950 encoded = self.encode(path)
944 951 if (
945 952 mode not in (b'r', b'rb')
946 953 and (path.startswith(b'data/') or path.startswith(b'meta/'))
947 954 and revlog_type(path) is not None
948 955 ):
949 956 # do not trigger a fncache load when adding a file that already is
950 957 # known to exist.
951 958 notload = self.fncache.entries is None and self.vfs.exists(encoded)
952 959 if notload and b'r+' in mode and not self.vfs.stat(encoded).st_size:
953 960 # when appending to an existing file, if the file has size zero,
954 961 # it should be considered as missing. Such zero-size files are
955 962 # the result of truncation when a transaction is aborted.
956 963 notload = False
957 964 if not notload:
958 965 self.fncache.add(path)
959 966 return self.vfs(encoded, mode, *args, **kw)
960 967
961 968 def join(self, path):
962 969 if path:
963 970 return self.vfs.join(self.encode(path))
964 971 else:
965 972 return self.vfs.join(path)
966 973
967 974 def register_file(self, path):
968 975 """generic hook point to lets fncache steer its stew"""
969 976 if path.startswith(b'data/') or path.startswith(b'meta/'):
970 977 self.fncache.add(path)
971 978
972 979
973 980 class fncachestore(basicstore):
974 981 def __init__(self, path, vfstype, dotencode):
975 982 if dotencode:
976 983 encode = _pathencode
977 984 else:
978 985 encode = _plainhybridencode
979 986 self.encode = encode
980 987 vfs = vfstype(path + b'/store')
981 988 self.path = vfs.base
982 989 self.pathsep = self.path + b'/'
983 990 self.createmode = _calcmode(vfs)
984 991 vfs.createmode = self.createmode
985 992 self.rawvfs = vfs
986 993 fnc = fncache(vfs)
987 994 self.fncache = fnc
988 995 self.vfs = _fncachevfs(vfs, fnc, encode)
989 996 self.opener = self.vfs
990 997
991 998 def join(self, f):
992 999 return self.pathsep + self.encode(f)
993 1000
994 1001 def getsize(self, path):
995 1002 return self.rawvfs.stat(path).st_size
996 1003
997 1004 def data_entries(
998 1005 self, matcher=None, undecodable=None
999 1006 ) -> Generator[BaseStoreEntry, None, None]:
1000 1007 files = ((f, revlog_type(f)) for f in self.fncache)
1001 1008 # Note: all files in fncache should be revlog related, However the
1002 1009 # fncache might contains such file added by previous version of
1003 1010 # Mercurial.
1004 1011 files = (f for f in files if f[1] is not None)
1005 1012 by_revlog = _gather_revlog(files)
1006 1013 for revlog, details in by_revlog:
1007 1014 file_details = {}
1008 1015 if revlog.startswith(b'data/'):
1009 1016 rl_type = FILEFLAGS_FILELOG
1010 1017 revlog_target_id = revlog.split(b'/', 1)[1]
1011 1018 elif revlog.startswith(b'meta/'):
1012 1019 rl_type = FILEFLAGS_MANIFESTLOG
1013 1020 # drop the initial directory and the `00manifest` file part
1014 1021 tmp = revlog.split(b'/', 1)[1]
1015 1022 revlog_target_id = tmp.rsplit(b'/', 1)[0] + b'/'
1016 1023 else:
1017 1024 # unreachable
1018 1025 assert False, revlog
1019 1026 for ext, t in details.items():
1020 1027 file_details[ext] = {
1021 1028 'is_volatile': bool(t & FILEFLAGS_VOLATILE),
1022 1029 }
1023 1030 entry = RevlogStoreEntry(
1024 1031 path_prefix=revlog,
1025 1032 revlog_type=rl_type,
1026 1033 target_id=revlog_target_id,
1027 1034 details=file_details,
1028 1035 )
1029 1036 if _match_tracked_entry(entry, matcher):
1030 1037 yield entry
1031 1038
1032 1039 def copylist(self):
1033 1040 d = (
1034 1041 b'bookmarks',
1035 1042 b'narrowspec',
1036 1043 b'data',
1037 1044 b'meta',
1038 1045 b'dh',
1039 1046 b'fncache',
1040 1047 b'phaseroots',
1041 1048 b'obsstore',
1042 1049 b'00manifest.d',
1043 1050 b'00manifest.i',
1044 1051 b'00changelog.d',
1045 1052 b'00changelog.i',
1046 1053 b'requires',
1047 1054 )
1048 1055 return [b'requires', b'00changelog.i'] + [b'store/' + f for f in d]
1049 1056
1050 1057 def write(self, tr):
1051 1058 self.fncache.write(tr)
1052 1059
1053 1060 def invalidatecaches(self):
1054 1061 self.fncache.entries = None
1055 1062 self.fncache.addls = set()
1056 1063
1057 1064 def markremoved(self, fn):
1058 1065 self.fncache.remove(fn)
1059 1066
1060 1067 def _exists(self, f):
1061 1068 ef = self.encode(f)
1062 1069 try:
1063 1070 self.getsize(ef)
1064 1071 return True
1065 1072 except FileNotFoundError:
1066 1073 return False
1067 1074
1068 1075 def __contains__(self, path):
1069 1076 '''Checks if the store contains path'''
1070 1077 path = b"/".join((b"data", path))
1071 1078 # check for files (exact match)
1072 1079 e = path + b'.i'
1073 1080 if e in self.fncache and self._exists(e):
1074 1081 return True
1075 1082 # now check for directories (prefix match)
1076 1083 if not path.endswith(b'/'):
1077 1084 path += b'/'
1078 1085 for e in self.fncache:
1079 1086 if e.startswith(path) and self._exists(e):
1080 1087 return True
1081 1088 return False
@@ -1,939 +1,936 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import contextlib
10 10 import os
11 11 import struct
12 12
13 13 from .i18n import _
14 14 from .pycompat import open
15 15 from .interfaces import repository
16 16 from . import (
17 17 bookmarks,
18 18 cacheutil,
19 19 error,
20 20 narrowspec,
21 21 phases,
22 22 pycompat,
23 23 requirements as requirementsmod,
24 24 scmutil,
25 25 store,
26 26 transaction,
27 27 util,
28 28 )
29 29 from .revlogutils import (
30 30 nodemap,
31 31 )
32 32
33 33
34 34 def new_stream_clone_requirements(default_requirements, streamed_requirements):
35 35 """determine the final set of requirement for a new stream clone
36 36
37 37 this method combine the "default" requirements that a new repository would
38 38 use with the constaint we get from the stream clone content. We keep local
39 39 configuration choice when possible.
40 40 """
41 41 requirements = set(default_requirements)
42 42 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
43 43 requirements.update(streamed_requirements)
44 44 return requirements
45 45
46 46
47 47 def streamed_requirements(repo):
48 48 """the set of requirement the new clone will have to support
49 49
50 50 This is used for advertising the stream options and to generate the actual
51 51 stream content."""
52 52 requiredformats = (
53 53 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
54 54 )
55 55 return requiredformats
56 56
57 57
58 58 def canperformstreamclone(pullop, bundle2=False):
59 59 """Whether it is possible to perform a streaming clone as part of pull.
60 60
61 61 ``bundle2`` will cause the function to consider stream clone through
62 62 bundle2 and only through bundle2.
63 63
64 64 Returns a tuple of (supported, requirements). ``supported`` is True if
65 65 streaming clone is supported and False otherwise. ``requirements`` is
66 66 a set of repo requirements from the remote, or ``None`` if stream clone
67 67 isn't supported.
68 68 """
69 69 repo = pullop.repo
70 70 remote = pullop.remote
71 71
72 72 bundle2supported = False
73 73 if pullop.canusebundle2:
74 74 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
75 75 bundle2supported = True
76 76 # else
77 77 # Server doesn't support bundle2 stream clone or doesn't support
78 78 # the versions we support. Fall back and possibly allow legacy.
79 79
80 80 # Ensures legacy code path uses available bundle2.
81 81 if bundle2supported and not bundle2:
82 82 return False, None
83 83 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
84 84 elif bundle2 and not bundle2supported:
85 85 return False, None
86 86
87 87 # Streaming clone only works on empty repositories.
88 88 if len(repo):
89 89 return False, None
90 90
91 91 # Streaming clone only works if all data is being requested.
92 92 if pullop.heads:
93 93 return False, None
94 94
95 95 streamrequested = pullop.streamclonerequested
96 96
97 97 # If we don't have a preference, let the server decide for us. This
98 98 # likely only comes into play in LANs.
99 99 if streamrequested is None:
100 100 # The server can advertise whether to prefer streaming clone.
101 101 streamrequested = remote.capable(b'stream-preferred')
102 102
103 103 if not streamrequested:
104 104 return False, None
105 105
106 106 # In order for stream clone to work, the client has to support all the
107 107 # requirements advertised by the server.
108 108 #
109 109 # The server advertises its requirements via the "stream" and "streamreqs"
110 110 # capability. "stream" (a value-less capability) is advertised if and only
111 111 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
112 112 # is advertised and contains a comma-delimited list of requirements.
113 113 requirements = set()
114 114 if remote.capable(b'stream'):
115 115 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
116 116 else:
117 117 streamreqs = remote.capable(b'streamreqs')
118 118 # This is weird and shouldn't happen with modern servers.
119 119 if not streamreqs:
120 120 pullop.repo.ui.warn(
121 121 _(
122 122 b'warning: stream clone requested but server has them '
123 123 b'disabled\n'
124 124 )
125 125 )
126 126 return False, None
127 127
128 128 streamreqs = set(streamreqs.split(b','))
129 129 # Server requires something we don't support. Bail.
130 130 missingreqs = streamreqs - repo.supported
131 131 if missingreqs:
132 132 pullop.repo.ui.warn(
133 133 _(
134 134 b'warning: stream clone requested but client is missing '
135 135 b'requirements: %s\n'
136 136 )
137 137 % b', '.join(sorted(missingreqs))
138 138 )
139 139 pullop.repo.ui.warn(
140 140 _(
141 141 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
142 142 b'for more information)\n'
143 143 )
144 144 )
145 145 return False, None
146 146 requirements = streamreqs
147 147
148 148 return True, requirements
149 149
150 150
151 151 def maybeperformlegacystreamclone(pullop):
152 152 """Possibly perform a legacy stream clone operation.
153 153
154 154 Legacy stream clones are performed as part of pull but before all other
155 155 operations.
156 156
157 157 A legacy stream clone will not be performed if a bundle2 stream clone is
158 158 supported.
159 159 """
160 160 from . import localrepo
161 161
162 162 supported, requirements = canperformstreamclone(pullop)
163 163
164 164 if not supported:
165 165 return
166 166
167 167 repo = pullop.repo
168 168 remote = pullop.remote
169 169
170 170 # Save remote branchmap. We will use it later to speed up branchcache
171 171 # creation.
172 172 rbranchmap = None
173 173 if remote.capable(b'branchmap'):
174 174 with remote.commandexecutor() as e:
175 175 rbranchmap = e.callcommand(b'branchmap', {}).result()
176 176
177 177 repo.ui.status(_(b'streaming all changes\n'))
178 178
179 179 with remote.commandexecutor() as e:
180 180 fp = e.callcommand(b'stream_out', {}).result()
181 181
182 182 # TODO strictly speaking, this code should all be inside the context
183 183 # manager because the context manager is supposed to ensure all wire state
184 184 # is flushed when exiting. But the legacy peers don't do this, so it
185 185 # doesn't matter.
186 186 l = fp.readline()
187 187 try:
188 188 resp = int(l)
189 189 except ValueError:
190 190 raise error.ResponseError(
191 191 _(b'unexpected response from remote server:'), l
192 192 )
193 193 if resp == 1:
194 194 raise error.Abort(_(b'operation forbidden by server'))
195 195 elif resp == 2:
196 196 raise error.Abort(_(b'locking the remote repository failed'))
197 197 elif resp != 0:
198 198 raise error.Abort(_(b'the server sent an unknown error code'))
199 199
200 200 l = fp.readline()
201 201 try:
202 202 filecount, bytecount = map(int, l.split(b' ', 1))
203 203 except (ValueError, TypeError):
204 204 raise error.ResponseError(
205 205 _(b'unexpected response from remote server:'), l
206 206 )
207 207
208 208 with repo.lock():
209 209 consumev1(repo, fp, filecount, bytecount)
210 210 repo.requirements = new_stream_clone_requirements(
211 211 repo.requirements,
212 212 requirements,
213 213 )
214 214 repo.svfs.options = localrepo.resolvestorevfsoptions(
215 215 repo.ui, repo.requirements, repo.features
216 216 )
217 217 scmutil.writereporequirements(repo)
218 218 nodemap.post_stream_cleanup(repo)
219 219
220 220 if rbranchmap:
221 221 repo._branchcaches.replace(repo, rbranchmap)
222 222
223 223 repo.invalidate()
224 224
225 225
226 226 def allowservergeneration(repo):
227 227 """Whether streaming clones are allowed from the server."""
228 228 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
229 229 return False
230 230
231 231 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
232 232 return False
233 233
234 234 # The way stream clone works makes it impossible to hide secret changesets.
235 235 # So don't allow this by default.
236 236 secret = phases.hassecret(repo)
237 237 if secret:
238 238 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
239 239
240 240 return True
241 241
242 242
243 243 # This is it's own function so extensions can override it.
244 def _walkstreamfiles(repo, matcher=None):
245 return repo.store.walk(matcher)
244 def _walkstreamfiles(repo, matcher=None, phase=False):
245 return repo.store.walk(matcher, phase=phase)
246 246
247 247
248 248 def generatev1(repo):
249 249 """Emit content for version 1 of a streaming clone.
250 250
251 251 This returns a 3-tuple of (file count, byte size, data iterator).
252 252
253 253 The data iterator consists of N entries for each file being transferred.
254 254 Each file entry starts as a line with the file name and integer size
255 255 delimited by a null byte.
256 256
257 257 The raw file data follows. Following the raw file data is the next file
258 258 entry, or EOF.
259 259
260 260 When used on the wire protocol, an additional line indicating protocol
261 261 success will be prepended to the stream. This function is not responsible
262 262 for adding it.
263 263
264 264 This function will obtain a repository lock to ensure a consistent view of
265 265 the store is captured. It therefore may raise LockError.
266 266 """
267 267 entries = []
268 268 total_bytes = 0
269 269 # Get consistent snapshot of repo, lock during scan.
270 270 with repo.lock():
271 271 repo.ui.debug(b'scanning\n')
272 272 for entry in _walkstreamfiles(repo):
273 273 for f in entry.files():
274 274 file_size = f.file_size(repo.store.vfs)
275 275 if file_size:
276 276 entries.append((f.unencoded_path, file_size))
277 277 total_bytes += file_size
278 278 _test_sync_point_walk_1(repo)
279 279 _test_sync_point_walk_2(repo)
280 280
281 281 repo.ui.debug(
282 282 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
283 283 )
284 284
285 285 svfs = repo.svfs
286 286 debugflag = repo.ui.debugflag
287 287
288 288 def emitrevlogdata():
289 289 for name, size in entries:
290 290 if debugflag:
291 291 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
292 292 # partially encode name over the wire for backwards compat
293 293 yield b'%s\0%d\n' % (store.encodedir(name), size)
294 294 # auditing at this stage is both pointless (paths are already
295 295 # trusted by the local repo) and expensive
296 296 with svfs(name, b'rb', auditpath=False) as fp:
297 297 if size <= 65536:
298 298 yield fp.read(size)
299 299 else:
300 300 for chunk in util.filechunkiter(fp, limit=size):
301 301 yield chunk
302 302
303 303 return len(entries), total_bytes, emitrevlogdata()
304 304
305 305
306 306 def generatev1wireproto(repo):
307 307 """Emit content for version 1 of streaming clone suitable for the wire.
308 308
309 309 This is the data output from ``generatev1()`` with 2 header lines. The
310 310 first line indicates overall success. The 2nd contains the file count and
311 311 byte size of payload.
312 312
313 313 The success line contains "0" for success, "1" for stream generation not
314 314 allowed, and "2" for error locking the repository (possibly indicating
315 315 a permissions error for the server process).
316 316 """
317 317 if not allowservergeneration(repo):
318 318 yield b'1\n'
319 319 return
320 320
321 321 try:
322 322 filecount, bytecount, it = generatev1(repo)
323 323 except error.LockError:
324 324 yield b'2\n'
325 325 return
326 326
327 327 # Indicates successful response.
328 328 yield b'0\n'
329 329 yield b'%d %d\n' % (filecount, bytecount)
330 330 for chunk in it:
331 331 yield chunk
332 332
333 333
334 334 def generatebundlev1(repo, compression=b'UN'):
335 335 """Emit content for version 1 of a stream clone bundle.
336 336
337 337 The first 4 bytes of the output ("HGS1") denote this as stream clone
338 338 bundle version 1.
339 339
340 340 The next 2 bytes indicate the compression type. Only "UN" is currently
341 341 supported.
342 342
343 343 The next 16 bytes are two 64-bit big endian unsigned integers indicating
344 344 file count and byte count, respectively.
345 345
346 346 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
347 347 of the requirements string, including a trailing \0. The following N bytes
348 348 are the requirements string, which is ASCII containing a comma-delimited
349 349 list of repo requirements that are needed to support the data.
350 350
351 351 The remaining content is the output of ``generatev1()`` (which may be
352 352 compressed in the future).
353 353
354 354 Returns a tuple of (requirements, data generator).
355 355 """
356 356 if compression != b'UN':
357 357 raise ValueError(b'we do not support the compression argument yet')
358 358
359 359 requirements = streamed_requirements(repo)
360 360 requires = b','.join(sorted(requirements))
361 361
362 362 def gen():
363 363 yield b'HGS1'
364 364 yield compression
365 365
366 366 filecount, bytecount, it = generatev1(repo)
367 367 repo.ui.status(
368 368 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
369 369 )
370 370
371 371 yield struct.pack(b'>QQ', filecount, bytecount)
372 372 yield struct.pack(b'>H', len(requires) + 1)
373 373 yield requires + b'\0'
374 374
375 375 # This is where we'll add compression in the future.
376 376 assert compression == b'UN'
377 377
378 378 progress = repo.ui.makeprogress(
379 379 _(b'bundle'), total=bytecount, unit=_(b'bytes')
380 380 )
381 381 progress.update(0)
382 382
383 383 for chunk in it:
384 384 progress.increment(step=len(chunk))
385 385 yield chunk
386 386
387 387 progress.complete()
388 388
389 389 return requirements, gen()
390 390
391 391
392 392 def consumev1(repo, fp, filecount, bytecount):
393 393 """Apply the contents from version 1 of a streaming clone file handle.
394 394
395 395 This takes the output from "stream_out" and applies it to the specified
396 396 repository.
397 397
398 398 Like "stream_out," the status line added by the wire protocol is not
399 399 handled by this function.
400 400 """
401 401 with repo.lock():
402 402 repo.ui.status(
403 403 _(b'%d files to transfer, %s of data\n')
404 404 % (filecount, util.bytecount(bytecount))
405 405 )
406 406 progress = repo.ui.makeprogress(
407 407 _(b'clone'), total=bytecount, unit=_(b'bytes')
408 408 )
409 409 progress.update(0)
410 410 start = util.timer()
411 411
412 412 # TODO: get rid of (potential) inconsistency
413 413 #
414 414 # If transaction is started and any @filecache property is
415 415 # changed at this point, it causes inconsistency between
416 416 # in-memory cached property and streamclone-ed file on the
417 417 # disk. Nested transaction prevents transaction scope "clone"
418 418 # below from writing in-memory changes out at the end of it,
419 419 # even though in-memory changes are discarded at the end of it
420 420 # regardless of transaction nesting.
421 421 #
422 422 # But transaction nesting can't be simply prohibited, because
423 423 # nesting occurs also in ordinary case (e.g. enabling
424 424 # clonebundles).
425 425
426 426 with repo.transaction(b'clone'):
427 427 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
428 428 for i in range(filecount):
429 429 # XXX doesn't support '\n' or '\r' in filenames
430 430 l = fp.readline()
431 431 try:
432 432 name, size = l.split(b'\0', 1)
433 433 size = int(size)
434 434 except (ValueError, TypeError):
435 435 raise error.ResponseError(
436 436 _(b'unexpected response from remote server:'), l
437 437 )
438 438 if repo.ui.debugflag:
439 439 repo.ui.debug(
440 440 b'adding %s (%s)\n' % (name, util.bytecount(size))
441 441 )
442 442 # for backwards compat, name was partially encoded
443 443 path = store.decodedir(name)
444 444 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
445 445 for chunk in util.filechunkiter(fp, limit=size):
446 446 progress.increment(step=len(chunk))
447 447 ofp.write(chunk)
448 448
449 449 # force @filecache properties to be reloaded from
450 450 # streamclone-ed file at next access
451 451 repo.invalidate(clearfilecache=True)
452 452
453 453 elapsed = util.timer() - start
454 454 if elapsed <= 0:
455 455 elapsed = 0.001
456 456 progress.complete()
457 457 repo.ui.status(
458 458 _(b'transferred %s in %.1f seconds (%s/sec)\n')
459 459 % (
460 460 util.bytecount(bytecount),
461 461 elapsed,
462 462 util.bytecount(bytecount / elapsed),
463 463 )
464 464 )
465 465
466 466
467 467 def readbundle1header(fp):
468 468 compression = fp.read(2)
469 469 if compression != b'UN':
470 470 raise error.Abort(
471 471 _(
472 472 b'only uncompressed stream clone bundles are '
473 473 b'supported; got %s'
474 474 )
475 475 % compression
476 476 )
477 477
478 478 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
479 479 requireslen = struct.unpack(b'>H', fp.read(2))[0]
480 480 requires = fp.read(requireslen)
481 481
482 482 if not requires.endswith(b'\0'):
483 483 raise error.Abort(
484 484 _(
485 485 b'malformed stream clone bundle: '
486 486 b'requirements not properly encoded'
487 487 )
488 488 )
489 489
490 490 requirements = set(requires.rstrip(b'\0').split(b','))
491 491
492 492 return filecount, bytecount, requirements
493 493
494 494
495 495 def applybundlev1(repo, fp):
496 496 """Apply the content from a stream clone bundle version 1.
497 497
498 498 We assume the 4 byte header has been read and validated and the file handle
499 499 is at the 2 byte compression identifier.
500 500 """
501 501 if len(repo):
502 502 raise error.Abort(
503 503 _(b'cannot apply stream clone bundle on non-empty repo')
504 504 )
505 505
506 506 filecount, bytecount, requirements = readbundle1header(fp)
507 507 missingreqs = requirements - repo.supported
508 508 if missingreqs:
509 509 raise error.Abort(
510 510 _(b'unable to apply stream clone: unsupported format: %s')
511 511 % b', '.join(sorted(missingreqs))
512 512 )
513 513
514 514 consumev1(repo, fp, filecount, bytecount)
515 515 nodemap.post_stream_cleanup(repo)
516 516
517 517
518 518 class streamcloneapplier:
519 519 """Class to manage applying streaming clone bundles.
520 520
521 521 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
522 522 readers to perform bundle type-specific functionality.
523 523 """
524 524
525 525 def __init__(self, fh):
526 526 self._fh = fh
527 527
528 528 def apply(self, repo):
529 529 return applybundlev1(repo, self._fh)
530 530
531 531
532 532 # type of file to stream
533 533 _fileappend = 0 # append only file
534 534 _filefull = 1 # full snapshot file
535 535
536 536 # Source of the file
537 537 _srcstore = b's' # store (svfs)
538 538 _srccache = b'c' # cache (cache)
539 539
540 540 # This is it's own function so extensions can override it.
541 541 def _walkstreamfullstorefiles(repo):
542 542 """list snapshot file from the store"""
543 543 fnames = []
544 544 if not repo.publishing():
545 545 fnames.append(b'phaseroots')
546 546 return fnames
547 547
548 548
549 549 def _filterfull(entry, copy, vfsmap):
550 550 """actually copy the snapshot files"""
551 551 src, name, ftype, data = entry
552 552 if ftype != _filefull:
553 553 return entry
554 554 return (src, name, ftype, copy(vfsmap[src].join(name)))
555 555
556 556
557 557 @contextlib.contextmanager
558 558 def maketempcopies():
559 559 """return a function to temporary copy file"""
560 560
561 561 files = []
562 562 dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
563 563 try:
564 564
565 565 def copy(src):
566 566 fd, dst = pycompat.mkstemp(
567 567 prefix=os.path.basename(src), dir=dst_dir
568 568 )
569 569 os.close(fd)
570 570 files.append(dst)
571 571 util.copyfiles(src, dst, hardlink=True)
572 572 return dst
573 573
574 574 yield copy
575 575 finally:
576 576 for tmp in files:
577 577 util.tryunlink(tmp)
578 578 util.tryrmdir(dst_dir)
579 579
580 580
581 581 def _makemap(repo):
582 582 """make a (src -> vfs) map for the repo"""
583 583 vfsmap = {
584 584 _srcstore: repo.svfs,
585 585 _srccache: repo.cachevfs,
586 586 }
587 587 # we keep repo.vfs out of the on purpose, ther are too many danger there
588 588 # (eg: .hg/hgrc)
589 589 assert repo.vfs not in vfsmap.values()
590 590
591 591 return vfsmap
592 592
593 593
594 594 def _emit2(repo, entries, totalfilesize):
595 595 """actually emit the stream bundle"""
596 596 vfsmap = _makemap(repo)
597 597 # we keep repo.vfs out of the on purpose, ther are too many danger there
598 598 # (eg: .hg/hgrc),
599 599 #
600 600 # this assert is duplicated (from _makemap) as author might think this is
601 601 # fine, while this is really not fine.
602 602 if repo.vfs in vfsmap.values():
603 603 raise error.ProgrammingError(
604 604 b'repo.vfs must not be added to vfsmap for security reasons'
605 605 )
606 606
607 607 progress = repo.ui.makeprogress(
608 608 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
609 609 )
610 610 progress.update(0)
611 611 with maketempcopies() as copy, progress:
612 612 # copy is delayed until we are in the try
613 613 entries = [_filterfull(e, copy, vfsmap) for e in entries]
614 614 yield None # this release the lock on the repository
615 615 totalbytecount = 0
616 616
617 617 for src, name, ftype, data in entries:
618 618 vfs = vfsmap[src]
619 619 yield src
620 620 yield util.uvarintencode(len(name))
621 621 if ftype == _fileappend:
622 622 fp = vfs(name)
623 623 size = data
624 624 elif ftype == _filefull:
625 625 fp = open(data, b'rb')
626 626 size = util.fstat(fp).st_size
627 627 bytecount = 0
628 628 try:
629 629 yield util.uvarintencode(size)
630 630 yield name
631 631 if size <= 65536:
632 632 chunks = (fp.read(size),)
633 633 else:
634 634 chunks = util.filechunkiter(fp, limit=size)
635 635 for chunk in chunks:
636 636 bytecount += len(chunk)
637 637 totalbytecount += len(chunk)
638 638 progress.update(totalbytecount)
639 639 yield chunk
640 640 if bytecount != size:
641 641 # Would most likely be caused by a race due to `hg strip` or
642 642 # a revlog split
643 643 raise error.Abort(
644 644 _(
645 645 b'clone could only read %d bytes from %s, but '
646 646 b'expected %d bytes'
647 647 )
648 648 % (bytecount, name, size)
649 649 )
650 650 finally:
651 651 fp.close()
652 652
653 653
654 654 def _test_sync_point_walk_1(repo):
655 655 """a function for synchronisation during tests"""
656 656
657 657
658 658 def _test_sync_point_walk_2(repo):
659 659 """a function for synchronisation during tests"""
660 660
661 661
662 662 def _v2_walk(repo, includes, excludes, includeobsmarkers):
663 663 """emit a seris of files information useful to clone a repo
664 664
665 665 return (entries, totalfilesize)
666 666
667 667 entries is a list of tuple (vfs-key, file-path, file-type, size)
668 668
669 669 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
670 670 - `name`: file path of the file to copy (to be feed to the vfss)
671 671 - `file-type`: do this file need to be copied with the source lock ?
672 672 - `size`: the size of the file (or None)
673 673 """
674 674 assert repo._currentlock(repo._lockref) is not None
675 675 entries = []
676 676 totalfilesize = 0
677 677
678 678 matcher = None
679 679 if includes or excludes:
680 680 matcher = narrowspec.match(repo.root, includes, excludes)
681 681
682 for entry in _walkstreamfiles(repo, matcher):
682 phase = not repo.publishing()
683 for entry in _walkstreamfiles(repo, matcher, phase=phase):
683 684 for f in entry.files():
684 685 file_size = f.file_size(repo.store.vfs)
685 686 if file_size:
686 687 ft = _fileappend
687 688 if f.is_volatile:
688 689 ft = _filefull
689 690 entries.append((_srcstore, f.unencoded_path, ft, file_size))
690 691 totalfilesize += file_size
691 for name in _walkstreamfullstorefiles(repo):
692 if repo.svfs.exists(name):
693 totalfilesize += repo.svfs.lstat(name).st_size
694 entries.append((_srcstore, name, _filefull, None))
695 692 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
696 693 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
697 694 entries.append((_srcstore, b'obsstore', _filefull, None))
698 695 for name in cacheutil.cachetocopy(repo):
699 696 if repo.cachevfs.exists(name):
700 697 totalfilesize += repo.cachevfs.lstat(name).st_size
701 698 entries.append((_srccache, name, _filefull, None))
702 699 return entries, totalfilesize
703 700
704 701
705 702 def generatev2(repo, includes, excludes, includeobsmarkers):
706 703 """Emit content for version 2 of a streaming clone.
707 704
708 705 the data stream consists the following entries:
709 706 1) A char representing the file destination (eg: store or cache)
710 707 2) A varint containing the length of the filename
711 708 3) A varint containing the length of file data
712 709 4) N bytes containing the filename (the internal, store-agnostic form)
713 710 5) N bytes containing the file data
714 711
715 712 Returns a 3-tuple of (file count, file size, data iterator).
716 713 """
717 714
718 715 with repo.lock():
719 716
720 717 repo.ui.debug(b'scanning\n')
721 718
722 719 entries, totalfilesize = _v2_walk(
723 720 repo,
724 721 includes=includes,
725 722 excludes=excludes,
726 723 includeobsmarkers=includeobsmarkers,
727 724 )
728 725
729 726 chunks = _emit2(repo, entries, totalfilesize)
730 727 first = next(chunks)
731 728 assert first is None
732 729 _test_sync_point_walk_1(repo)
733 730 _test_sync_point_walk_2(repo)
734 731
735 732 return len(entries), totalfilesize, chunks
736 733
737 734
738 735 @contextlib.contextmanager
739 736 def nested(*ctxs):
740 737 this = ctxs[0]
741 738 rest = ctxs[1:]
742 739 with this:
743 740 if rest:
744 741 with nested(*rest):
745 742 yield
746 743 else:
747 744 yield
748 745
749 746
750 747 def consumev2(repo, fp, filecount, filesize):
751 748 """Apply the contents from a version 2 streaming clone.
752 749
753 750 Data is read from an object that only needs to provide a ``read(size)``
754 751 method.
755 752 """
756 753 with repo.lock():
757 754 repo.ui.status(
758 755 _(b'%d files to transfer, %s of data\n')
759 756 % (filecount, util.bytecount(filesize))
760 757 )
761 758
762 759 start = util.timer()
763 760 progress = repo.ui.makeprogress(
764 761 _(b'clone'), total=filesize, unit=_(b'bytes')
765 762 )
766 763 progress.update(0)
767 764
768 765 vfsmap = _makemap(repo)
769 766 # we keep repo.vfs out of the on purpose, ther are too many danger
770 767 # there (eg: .hg/hgrc),
771 768 #
772 769 # this assert is duplicated (from _makemap) as author might think this
773 770 # is fine, while this is really not fine.
774 771 if repo.vfs in vfsmap.values():
775 772 raise error.ProgrammingError(
776 773 b'repo.vfs must not be added to vfsmap for security reasons'
777 774 )
778 775
779 776 with repo.transaction(b'clone'):
780 777 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
781 778 with nested(*ctxs):
782 779 for i in range(filecount):
783 780 src = util.readexactly(fp, 1)
784 781 vfs = vfsmap[src]
785 782 namelen = util.uvarintdecodestream(fp)
786 783 datalen = util.uvarintdecodestream(fp)
787 784
788 785 name = util.readexactly(fp, namelen)
789 786
790 787 if repo.ui.debugflag:
791 788 repo.ui.debug(
792 789 b'adding [%s] %s (%s)\n'
793 790 % (src, name, util.bytecount(datalen))
794 791 )
795 792
796 793 with vfs(name, b'w') as ofp:
797 794 for chunk in util.filechunkiter(fp, limit=datalen):
798 795 progress.increment(step=len(chunk))
799 796 ofp.write(chunk)
800 797
801 798 # force @filecache properties to be reloaded from
802 799 # streamclone-ed file at next access
803 800 repo.invalidate(clearfilecache=True)
804 801
805 802 elapsed = util.timer() - start
806 803 if elapsed <= 0:
807 804 elapsed = 0.001
808 805 repo.ui.status(
809 806 _(b'transferred %s in %.1f seconds (%s/sec)\n')
810 807 % (
811 808 util.bytecount(progress.pos),
812 809 elapsed,
813 810 util.bytecount(progress.pos / elapsed),
814 811 )
815 812 )
816 813 progress.complete()
817 814
818 815
819 816 def applybundlev2(repo, fp, filecount, filesize, requirements):
820 817 from . import localrepo
821 818
822 819 missingreqs = [r for r in requirements if r not in repo.supported]
823 820 if missingreqs:
824 821 raise error.Abort(
825 822 _(b'unable to apply stream clone: unsupported format: %s')
826 823 % b', '.join(sorted(missingreqs))
827 824 )
828 825
829 826 consumev2(repo, fp, filecount, filesize)
830 827
831 828 repo.requirements = new_stream_clone_requirements(
832 829 repo.requirements,
833 830 requirements,
834 831 )
835 832 repo.svfs.options = localrepo.resolvestorevfsoptions(
836 833 repo.ui, repo.requirements, repo.features
837 834 )
838 835 scmutil.writereporequirements(repo)
839 836 nodemap.post_stream_cleanup(repo)
840 837
841 838
842 839 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
843 840 hardlink = [True]
844 841
845 842 def copy_used():
846 843 hardlink[0] = False
847 844 progress.topic = _(b'copying')
848 845
849 846 for k, path, size in entries:
850 847 src_vfs = src_vfs_map[k]
851 848 dst_vfs = dst_vfs_map[k]
852 849 src_path = src_vfs.join(path)
853 850 dst_path = dst_vfs.join(path)
854 851 # We cannot use dirname and makedirs of dst_vfs here because the store
855 852 # encoding confuses them. See issue 6581 for details.
856 853 dirname = os.path.dirname(dst_path)
857 854 if not os.path.exists(dirname):
858 855 util.makedirs(dirname)
859 856 dst_vfs.register_file(path)
860 857 # XXX we could use the #nb_bytes argument.
861 858 util.copyfile(
862 859 src_path,
863 860 dst_path,
864 861 hardlink=hardlink[0],
865 862 no_hardlink_cb=copy_used,
866 863 check_fs_hardlink=False,
867 864 )
868 865 progress.increment()
869 866 return hardlink[0]
870 867
871 868
872 869 def local_copy(src_repo, dest_repo):
873 870 """copy all content from one local repository to another
874 871
875 872 This is useful for local clone"""
876 873 src_store_requirements = {
877 874 r
878 875 for r in src_repo.requirements
879 876 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
880 877 }
881 878 dest_store_requirements = {
882 879 r
883 880 for r in dest_repo.requirements
884 881 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
885 882 }
886 883 assert src_store_requirements == dest_store_requirements
887 884
888 885 with dest_repo.lock():
889 886 with src_repo.lock():
890 887
891 888 # bookmark is not integrated to the streaming as it might use the
892 889 # `repo.vfs` and they are too many sentitive data accessible
893 890 # through `repo.vfs` to expose it to streaming clone.
894 891 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
895 892 srcbookmarks = src_book_vfs.join(b'bookmarks')
896 893 bm_count = 0
897 894 if os.path.exists(srcbookmarks):
898 895 bm_count = 1
899 896
900 897 entries, totalfilesize = _v2_walk(
901 898 src_repo,
902 899 includes=None,
903 900 excludes=None,
904 901 includeobsmarkers=True,
905 902 )
906 903 src_vfs_map = _makemap(src_repo)
907 904 dest_vfs_map = _makemap(dest_repo)
908 905 progress = src_repo.ui.makeprogress(
909 906 topic=_(b'linking'),
910 907 total=len(entries) + bm_count,
911 908 unit=_(b'files'),
912 909 )
913 910 # copy files
914 911 #
915 912 # We could copy the full file while the source repository is locked
916 913 # and the other one without the lock. However, in the linking case,
917 914 # this would also requires checks that nobody is appending any data
918 915 # to the files while we do the clone, so this is not done yet. We
919 916 # could do this blindly when copying files.
920 917 files = ((k, path, size) for k, path, ftype, size in entries)
921 918 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
922 919
923 920 # copy bookmarks over
924 921 if bm_count:
925 922 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
926 923 dstbookmarks = dst_book_vfs.join(b'bookmarks')
927 924 util.copyfile(srcbookmarks, dstbookmarks)
928 925 progress.complete()
929 926 if hardlink:
930 927 msg = b'linked %d files\n'
931 928 else:
932 929 msg = b'copied %d files\n'
933 930 src_repo.ui.debug(msg % (len(entries) + bm_count))
934 931
935 932 with dest_repo.transaction(b"localclone") as tr:
936 933 dest_repo.store.write(tr)
937 934
938 935 # clean up transaction file as they do not make sense
939 936 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
General Comments 0
You need to be logged in to leave comments. Login now