##// END OF EJS Templates
store: make `walk` return an entry for obsolescence if requested so...
marmoute -
r51407:0925eaf0 default
parent child Browse files
Show More
@@ -1,442 +1,446 b''
1 1 # remotefilelogserver.py - server logic for a remotefilelog server
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import os
9 9 import stat
10 10 import time
11 11 import zlib
12 12
13 13 from mercurial.i18n import _
14 14 from mercurial.node import bin, hex
15 15 from mercurial.pycompat import open
16 16 from mercurial import (
17 17 changegroup,
18 18 changelog,
19 19 context,
20 20 error,
21 21 extensions,
22 22 match,
23 23 scmutil,
24 24 store,
25 25 streamclone,
26 26 util,
27 27 wireprotoserver,
28 28 wireprototypes,
29 29 wireprotov1server,
30 30 )
31 31 from . import (
32 32 constants,
33 33 shallowutil,
34 34 )
35 35
36 36 _sshv1server = wireprotoserver.sshv1protocolhandler
37 37
38 38
39 39 def setupserver(ui, repo):
40 40 """Sets up a normal Mercurial repo so it can serve files to shallow repos."""
41 41 onetimesetup(ui)
42 42
43 43 # don't send files to shallow clients during pulls
44 44 def generatefiles(
45 45 orig, self, changedfiles, linknodes, commonrevs, source, *args, **kwargs
46 46 ):
47 47 caps = self._bundlecaps or []
48 48 if constants.BUNDLE2_CAPABLITY in caps:
49 49 # only send files that don't match the specified patterns
50 50 includepattern = None
51 51 excludepattern = None
52 52 for cap in self._bundlecaps or []:
53 53 if cap.startswith(b"includepattern="):
54 54 includepattern = cap[len(b"includepattern=") :].split(b'\0')
55 55 elif cap.startswith(b"excludepattern="):
56 56 excludepattern = cap[len(b"excludepattern=") :].split(b'\0')
57 57
58 58 m = match.always()
59 59 if includepattern or excludepattern:
60 60 m = match.match(
61 61 repo.root, b'', None, includepattern, excludepattern
62 62 )
63 63
64 64 changedfiles = list([f for f in changedfiles if not m(f)])
65 65 return orig(
66 66 self, changedfiles, linknodes, commonrevs, source, *args, **kwargs
67 67 )
68 68
69 69 extensions.wrapfunction(
70 70 changegroup.cgpacker, b'generatefiles', generatefiles
71 71 )
72 72
73 73
74 74 onetime = False
75 75
76 76
77 77 def onetimesetup(ui):
78 78 """Configures the wireprotocol for both clients and servers."""
79 79 global onetime
80 80 if onetime:
81 81 return
82 82 onetime = True
83 83
84 84 # support file content requests
85 85 wireprotov1server.wireprotocommand(
86 86 b'x_rfl_getflogheads', b'path', permission=b'pull'
87 87 )(getflogheads)
88 88 wireprotov1server.wireprotocommand(
89 89 b'x_rfl_getfiles', b'', permission=b'pull'
90 90 )(getfiles)
91 91 wireprotov1server.wireprotocommand(
92 92 b'x_rfl_getfile', b'file node', permission=b'pull'
93 93 )(getfile)
94 94
95 95 class streamstate:
96 96 match = None
97 97 shallowremote = False
98 98 noflatmf = False
99 99
100 100 state = streamstate()
101 101
102 102 def stream_out_shallow(repo, proto, other):
103 103 includepattern = None
104 104 excludepattern = None
105 105 raw = other.get(b'includepattern')
106 106 if raw:
107 107 includepattern = raw.split(b'\0')
108 108 raw = other.get(b'excludepattern')
109 109 if raw:
110 110 excludepattern = raw.split(b'\0')
111 111
112 112 oldshallow = state.shallowremote
113 113 oldmatch = state.match
114 114 oldnoflatmf = state.noflatmf
115 115 try:
116 116 state.shallowremote = True
117 117 state.match = match.always()
118 118 state.noflatmf = other.get(b'noflatmanifest') == b'True'
119 119 if includepattern or excludepattern:
120 120 state.match = match.match(
121 121 repo.root, b'', None, includepattern, excludepattern
122 122 )
123 123 streamres = wireprotov1server.stream(repo, proto)
124 124
125 125 # Force the first value to execute, so the file list is computed
126 126 # within the try/finally scope
127 127 first = next(streamres.gen)
128 128 second = next(streamres.gen)
129 129
130 130 def gen():
131 131 yield first
132 132 yield second
133 133 for value in streamres.gen:
134 134 yield value
135 135
136 136 return wireprototypes.streamres(gen())
137 137 finally:
138 138 state.shallowremote = oldshallow
139 139 state.match = oldmatch
140 140 state.noflatmf = oldnoflatmf
141 141
142 142 wireprotov1server.commands[b'stream_out_shallow'] = (
143 143 stream_out_shallow,
144 144 b'*',
145 145 )
146 146
147 147 # don't clone filelogs to shallow clients
148 def _walkstreamfiles(orig, repo, matcher=None, phase=False):
148 def _walkstreamfiles(
149 orig, repo, matcher=None, phase=False, obsolescence=False
150 ):
149 151 if state.shallowremote:
150 152 # if we are shallow ourselves, stream our local commits
151 153 if shallowutil.isenabled(repo):
152 154 striplen = len(repo.store.path) + 1
153 155 readdir = repo.store.rawvfs.readdir
154 156 visit = [os.path.join(repo.store.path, b'data')]
155 157 while visit:
156 158 p = visit.pop()
157 159 for f, kind, st in readdir(p, stat=True):
158 160 fp = p + b'/' + f
159 161 if kind == stat.S_IFREG:
160 162 if not fp.endswith(b'.i') and not fp.endswith(
161 163 b'.d'
162 164 ):
163 165 n = util.pconvert(fp[striplen:])
164 166 d = store.decodedir(n)
165 167 yield store.SimpleStoreEntry(
166 168 entry_path=d,
167 169 is_volatile=False,
168 170 file_size=st.st_size,
169 171 )
170 172
171 173 if kind == stat.S_IFDIR:
172 174 visit.append(fp)
173 175
174 176 if scmutil.istreemanifest(repo):
175 177 for entry in repo.store.data_entries():
176 178 if not entry.is_revlog:
177 179 continue
178 180 if entry.is_manifestlog:
179 181 yield entry
180 182
181 183 # Return .d and .i files that do not match the shallow pattern
182 184 match = state.match
183 185 if match and not match.always():
184 186 for entry in repo.store.data_entries():
185 187 if not entry.is_revlog:
186 188 continue
187 189 if not state.match(entry.target_id):
188 190 yield entry
189 191
190 192 for x in repo.store.top_entries():
191 193 if state.noflatmf and x[1][:11] == b'00manifest.':
192 194 continue
193 195 yield x
194 196
195 197 elif shallowutil.isenabled(repo):
196 198 # don't allow cloning from a shallow repo to a full repo
197 199 # since it would require fetching every version of every
198 200 # file in order to create the revlogs.
199 201 raise error.Abort(
200 202 _(b"Cannot clone from a shallow repo to a full repo.")
201 203 )
202 204 else:
203 for x in orig(repo, matcher, phase=phase):
205 for x in orig(
206 repo, matcher, phase=phase, obsolescence=obsolescence
207 ):
204 208 yield x
205 209
206 210 extensions.wrapfunction(streamclone, b'_walkstreamfiles', _walkstreamfiles)
207 211
208 212 # expose remotefilelog capabilities
209 213 def _capabilities(orig, repo, proto):
210 214 caps = orig(repo, proto)
211 215 if shallowutil.isenabled(repo) or ui.configbool(
212 216 b'remotefilelog', b'server'
213 217 ):
214 218 if isinstance(proto, _sshv1server):
215 219 # legacy getfiles method which only works over ssh
216 220 caps.append(constants.NETWORK_CAP_LEGACY_SSH_GETFILES)
217 221 caps.append(b'x_rfl_getflogheads')
218 222 caps.append(b'x_rfl_getfile')
219 223 return caps
220 224
221 225 extensions.wrapfunction(wireprotov1server, b'_capabilities', _capabilities)
222 226
223 227 def _adjustlinkrev(orig, self, *args, **kwargs):
224 228 # When generating file blobs, taking the real path is too slow on large
225 229 # repos, so force it to just return the linkrev directly.
226 230 repo = self._repo
227 231 if util.safehasattr(repo, b'forcelinkrev') and repo.forcelinkrev:
228 232 return self._filelog.linkrev(self._filelog.rev(self._filenode))
229 233 return orig(self, *args, **kwargs)
230 234
231 235 extensions.wrapfunction(
232 236 context.basefilectx, b'_adjustlinkrev', _adjustlinkrev
233 237 )
234 238
235 239 def _iscmd(orig, cmd):
236 240 if cmd == b'x_rfl_getfiles':
237 241 return False
238 242 return orig(cmd)
239 243
240 244 extensions.wrapfunction(wireprotoserver, b'iscmd', _iscmd)
241 245
242 246
243 247 def _loadfileblob(repo, cachepath, path, node):
244 248 filecachepath = os.path.join(cachepath, path, hex(node))
245 249 if not os.path.exists(filecachepath) or os.path.getsize(filecachepath) == 0:
246 250 filectx = repo.filectx(path, fileid=node)
247 251 if filectx.node() == repo.nullid:
248 252 repo.changelog = changelog.changelog(repo.svfs)
249 253 filectx = repo.filectx(path, fileid=node)
250 254
251 255 text = createfileblob(filectx)
252 256 # TODO configurable compression engines
253 257 text = zlib.compress(text)
254 258
255 259 # everything should be user & group read/writable
256 260 oldumask = os.umask(0o002)
257 261 try:
258 262 dirname = os.path.dirname(filecachepath)
259 263 if not os.path.exists(dirname):
260 264 try:
261 265 os.makedirs(dirname)
262 266 except FileExistsError:
263 267 pass
264 268
265 269 f = None
266 270 try:
267 271 f = util.atomictempfile(filecachepath, b"wb")
268 272 f.write(text)
269 273 except (IOError, OSError):
270 274 # Don't abort if the user only has permission to read,
271 275 # and not write.
272 276 pass
273 277 finally:
274 278 if f:
275 279 f.close()
276 280 finally:
277 281 os.umask(oldumask)
278 282 else:
279 283 with open(filecachepath, b"rb") as f:
280 284 text = f.read()
281 285 return text
282 286
283 287
284 288 def getflogheads(repo, proto, path):
285 289 """A server api for requesting a filelog's heads"""
286 290 flog = repo.file(path)
287 291 heads = flog.heads()
288 292 return b'\n'.join((hex(head) for head in heads if head != repo.nullid))
289 293
290 294
291 295 def getfile(repo, proto, file, node):
292 296 """A server api for requesting a particular version of a file. Can be used
293 297 in batches to request many files at once. The return protocol is:
294 298 <errorcode>\0<data/errormsg> where <errorcode> is 0 for success or
295 299 non-zero for an error.
296 300
297 301 data is a compressed blob with revlog flag and ancestors information. See
298 302 createfileblob for its content.
299 303 """
300 304 if shallowutil.isenabled(repo):
301 305 return b'1\0' + _(b'cannot fetch remote files from shallow repo')
302 306 cachepath = repo.ui.config(b"remotefilelog", b"servercachepath")
303 307 if not cachepath:
304 308 cachepath = os.path.join(repo.path, b"remotefilelogcache")
305 309 node = bin(node.strip())
306 310 if node == repo.nullid:
307 311 return b'0\0'
308 312 return b'0\0' + _loadfileblob(repo, cachepath, file, node)
309 313
310 314
311 315 def getfiles(repo, proto):
312 316 """A server api for requesting particular versions of particular files."""
313 317 if shallowutil.isenabled(repo):
314 318 raise error.Abort(_(b'cannot fetch remote files from shallow repo'))
315 319 if not isinstance(proto, _sshv1server):
316 320 raise error.Abort(_(b'cannot fetch remote files over non-ssh protocol'))
317 321
318 322 def streamer():
319 323 fin = proto._fin
320 324
321 325 cachepath = repo.ui.config(b"remotefilelog", b"servercachepath")
322 326 if not cachepath:
323 327 cachepath = os.path.join(repo.path, b"remotefilelogcache")
324 328
325 329 while True:
326 330 request = fin.readline()[:-1]
327 331 if not request:
328 332 break
329 333
330 334 node = bin(request[:40])
331 335 if node == repo.nullid:
332 336 yield b'0\n'
333 337 continue
334 338
335 339 path = request[40:]
336 340
337 341 text = _loadfileblob(repo, cachepath, path, node)
338 342
339 343 yield b'%d\n%s' % (len(text), text)
340 344
341 345 # it would be better to only flush after processing a whole batch
342 346 # but currently we don't know if there are more requests coming
343 347 proto._fout.flush()
344 348
345 349 return wireprototypes.streamres(streamer())
346 350
347 351
348 352 def createfileblob(filectx):
349 353 """
350 354 format:
351 355 v0:
352 356 str(len(rawtext)) + '\0' + rawtext + ancestortext
353 357 v1:
354 358 'v1' + '\n' + metalist + '\0' + rawtext + ancestortext
355 359 metalist := metalist + '\n' + meta | meta
356 360 meta := sizemeta | flagmeta
357 361 sizemeta := METAKEYSIZE + str(len(rawtext))
358 362 flagmeta := METAKEYFLAG + str(flag)
359 363
360 364 note: sizemeta must exist. METAKEYFLAG and METAKEYSIZE must have a
361 365 length of 1.
362 366 """
363 367 flog = filectx.filelog()
364 368 frev = filectx.filerev()
365 369 revlogflags = flog._revlog.flags(frev)
366 370 if revlogflags == 0:
367 371 # normal files
368 372 text = filectx.data()
369 373 else:
370 374 # lfs, read raw revision data
371 375 text = flog.rawdata(frev)
372 376
373 377 repo = filectx._repo
374 378
375 379 ancestors = [filectx]
376 380
377 381 try:
378 382 repo.forcelinkrev = True
379 383 ancestors.extend([f for f in filectx.ancestors()])
380 384
381 385 ancestortext = b""
382 386 for ancestorctx in ancestors:
383 387 parents = ancestorctx.parents()
384 388 p1 = repo.nullid
385 389 p2 = repo.nullid
386 390 if len(parents) > 0:
387 391 p1 = parents[0].filenode()
388 392 if len(parents) > 1:
389 393 p2 = parents[1].filenode()
390 394
391 395 copyname = b""
392 396 rename = ancestorctx.renamed()
393 397 if rename:
394 398 copyname = rename[0]
395 399 linknode = ancestorctx.node()
396 400 ancestortext += b"%s%s%s%s%s\0" % (
397 401 ancestorctx.filenode(),
398 402 p1,
399 403 p2,
400 404 linknode,
401 405 copyname,
402 406 )
403 407 finally:
404 408 repo.forcelinkrev = False
405 409
406 410 header = shallowutil.buildfileblobheader(len(text), revlogflags)
407 411
408 412 return b"%s\0%s%s" % (header, text, ancestortext)
409 413
410 414
411 415 def gcserver(ui, repo):
412 416 if not repo.ui.configbool(b"remotefilelog", b"server"):
413 417 return
414 418
415 419 neededfiles = set()
416 420 heads = repo.revs(b"heads(tip~25000:) - null")
417 421
418 422 cachepath = repo.vfs.join(b"remotefilelogcache")
419 423 for head in heads:
420 424 mf = repo[head].manifest()
421 425 for filename, filenode in mf.items():
422 426 filecachepath = os.path.join(cachepath, filename, hex(filenode))
423 427 neededfiles.add(filecachepath)
424 428
425 429 # delete unneeded older files
426 430 days = repo.ui.configint(b"remotefilelog", b"serverexpiration")
427 431 expiration = time.time() - (days * 24 * 60 * 60)
428 432
429 433 progress = ui.makeprogress(_(b"removing old server cache"), unit=b"files")
430 434 progress.update(0)
431 435 for root, dirs, files in os.walk(cachepath):
432 436 for file in files:
433 437 filepath = os.path.join(root, file)
434 438 progress.increment()
435 439 if filepath in neededfiles:
436 440 continue
437 441
438 442 stat = os.stat(filepath)
439 443 if stat.st_mtime < expiration:
440 444 os.remove(filepath)
441 445
442 446 progress.complete()
@@ -1,1089 +1,1098 b''
1 1 # store.py - repository store handling for Mercurial
2 2 #
3 3 # Copyright 2008 Olivia Mackall <olivia@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import collections
9 9 import functools
10 10 import os
11 11 import re
12 12 import stat
13 13 from typing import Generator
14 14
15 15 from .i18n import _
16 16 from .pycompat import getattr
17 17 from .thirdparty import attr
18 18 from .node import hex
19 19 from . import (
20 20 changelog,
21 21 error,
22 22 manifest,
23 23 policy,
24 24 pycompat,
25 25 util,
26 26 vfs as vfsmod,
27 27 )
28 28 from .utils import hashutil
29 29
30 30 parsers = policy.importmod('parsers')
31 31 # how much bytes should be read from fncache in one read
32 32 # It is done to prevent loading large fncache files into memory
33 33 fncache_chunksize = 10 ** 6
34 34
35 35
36 36 def _match_tracked_entry(entry, matcher):
37 37 """parses a fncache entry and returns whether the entry is tracking a path
38 38 matched by matcher or not.
39 39
40 40 If matcher is None, returns True"""
41 41
42 42 if matcher is None:
43 43 return True
44 44 if entry.is_filelog:
45 45 return matcher(entry.target_id)
46 46 elif entry.is_manifestlog:
47 47 return matcher.visitdir(entry.target_id.rstrip(b'/'))
48 48 raise error.ProgrammingError(b"cannot process entry %r" % entry)
49 49
50 50
51 51 # This avoids a collision between a file named foo and a dir named
52 52 # foo.i or foo.d
53 53 def _encodedir(path):
54 54 """
55 55 >>> _encodedir(b'data/foo.i')
56 56 'data/foo.i'
57 57 >>> _encodedir(b'data/foo.i/bla.i')
58 58 'data/foo.i.hg/bla.i'
59 59 >>> _encodedir(b'data/foo.i.hg/bla.i')
60 60 'data/foo.i.hg.hg/bla.i'
61 61 >>> _encodedir(b'data/foo.i\\ndata/foo.i/bla.i\\ndata/foo.i.hg/bla.i\\n')
62 62 'data/foo.i\\ndata/foo.i.hg/bla.i\\ndata/foo.i.hg.hg/bla.i\\n'
63 63 """
64 64 return (
65 65 path.replace(b".hg/", b".hg.hg/")
66 66 .replace(b".i/", b".i.hg/")
67 67 .replace(b".d/", b".d.hg/")
68 68 )
69 69
70 70
71 71 encodedir = getattr(parsers, 'encodedir', _encodedir)
72 72
73 73
74 74 def decodedir(path):
75 75 """
76 76 >>> decodedir(b'data/foo.i')
77 77 'data/foo.i'
78 78 >>> decodedir(b'data/foo.i.hg/bla.i')
79 79 'data/foo.i/bla.i'
80 80 >>> decodedir(b'data/foo.i.hg.hg/bla.i')
81 81 'data/foo.i.hg/bla.i'
82 82 """
83 83 if b".hg/" not in path:
84 84 return path
85 85 return (
86 86 path.replace(b".d.hg/", b".d/")
87 87 .replace(b".i.hg/", b".i/")
88 88 .replace(b".hg.hg/", b".hg/")
89 89 )
90 90
91 91
92 92 def _reserved():
93 93 """characters that are problematic for filesystems
94 94
95 95 * ascii escapes (0..31)
96 96 * ascii hi (126..255)
97 97 * windows specials
98 98
99 99 these characters will be escaped by encodefunctions
100 100 """
101 101 winreserved = [ord(x) for x in u'\\:*?"<>|']
102 102 for x in range(32):
103 103 yield x
104 104 for x in range(126, 256):
105 105 yield x
106 106 for x in winreserved:
107 107 yield x
108 108
109 109
110 110 def _buildencodefun():
111 111 """
112 112 >>> enc, dec = _buildencodefun()
113 113
114 114 >>> enc(b'nothing/special.txt')
115 115 'nothing/special.txt'
116 116 >>> dec(b'nothing/special.txt')
117 117 'nothing/special.txt'
118 118
119 119 >>> enc(b'HELLO')
120 120 '_h_e_l_l_o'
121 121 >>> dec(b'_h_e_l_l_o')
122 122 'HELLO'
123 123
124 124 >>> enc(b'hello:world?')
125 125 'hello~3aworld~3f'
126 126 >>> dec(b'hello~3aworld~3f')
127 127 'hello:world?'
128 128
129 129 >>> enc(b'the\\x07quick\\xADshot')
130 130 'the~07quick~adshot'
131 131 >>> dec(b'the~07quick~adshot')
132 132 'the\\x07quick\\xadshot'
133 133 """
134 134 e = b'_'
135 135 xchr = pycompat.bytechr
136 136 asciistr = list(map(xchr, range(127)))
137 137 capitals = list(range(ord(b"A"), ord(b"Z") + 1))
138 138
139 139 cmap = {x: x for x in asciistr}
140 140 for x in _reserved():
141 141 cmap[xchr(x)] = b"~%02x" % x
142 142 for x in capitals + [ord(e)]:
143 143 cmap[xchr(x)] = e + xchr(x).lower()
144 144
145 145 dmap = {}
146 146 for k, v in cmap.items():
147 147 dmap[v] = k
148 148
149 149 def decode(s):
150 150 i = 0
151 151 while i < len(s):
152 152 for l in range(1, 4):
153 153 try:
154 154 yield dmap[s[i : i + l]]
155 155 i += l
156 156 break
157 157 except KeyError:
158 158 pass
159 159 else:
160 160 raise KeyError
161 161
162 162 return (
163 163 lambda s: b''.join([cmap[s[c : c + 1]] for c in range(len(s))]),
164 164 lambda s: b''.join(list(decode(s))),
165 165 )
166 166
167 167
168 168 _encodefname, _decodefname = _buildencodefun()
169 169
170 170
171 171 def encodefilename(s):
172 172 """
173 173 >>> encodefilename(b'foo.i/bar.d/bla.hg/hi:world?/HELLO')
174 174 'foo.i.hg/bar.d.hg/bla.hg.hg/hi~3aworld~3f/_h_e_l_l_o'
175 175 """
176 176 return _encodefname(encodedir(s))
177 177
178 178
179 179 def decodefilename(s):
180 180 """
181 181 >>> decodefilename(b'foo.i.hg/bar.d.hg/bla.hg.hg/hi~3aworld~3f/_h_e_l_l_o')
182 182 'foo.i/bar.d/bla.hg/hi:world?/HELLO'
183 183 """
184 184 return decodedir(_decodefname(s))
185 185
186 186
187 187 def _buildlowerencodefun():
188 188 """
189 189 >>> f = _buildlowerencodefun()
190 190 >>> f(b'nothing/special.txt')
191 191 'nothing/special.txt'
192 192 >>> f(b'HELLO')
193 193 'hello'
194 194 >>> f(b'hello:world?')
195 195 'hello~3aworld~3f'
196 196 >>> f(b'the\\x07quick\\xADshot')
197 197 'the~07quick~adshot'
198 198 """
199 199 xchr = pycompat.bytechr
200 200 cmap = {xchr(x): xchr(x) for x in range(127)}
201 201 for x in _reserved():
202 202 cmap[xchr(x)] = b"~%02x" % x
203 203 for x in range(ord(b"A"), ord(b"Z") + 1):
204 204 cmap[xchr(x)] = xchr(x).lower()
205 205
206 206 def lowerencode(s):
207 207 return b"".join([cmap[c] for c in pycompat.iterbytestr(s)])
208 208
209 209 return lowerencode
210 210
211 211
212 212 lowerencode = getattr(parsers, 'lowerencode', None) or _buildlowerencodefun()
213 213
214 214 # Windows reserved names: con, prn, aux, nul, com1..com9, lpt1..lpt9
215 215 _winres3 = (b'aux', b'con', b'prn', b'nul') # length 3
216 216 _winres4 = (b'com', b'lpt') # length 4 (with trailing 1..9)
217 217
218 218
219 219 def _auxencode(path, dotencode):
220 220 """
221 221 Encodes filenames containing names reserved by Windows or which end in
222 222 period or space. Does not touch other single reserved characters c.
223 223 Specifically, c in '\\:*?"<>|' or ord(c) <= 31 are *not* encoded here.
224 224 Additionally encodes space or period at the beginning, if dotencode is
225 225 True. Parameter path is assumed to be all lowercase.
226 226 A segment only needs encoding if a reserved name appears as a
227 227 basename (e.g. "aux", "aux.foo"). A directory or file named "foo.aux"
228 228 doesn't need encoding.
229 229
230 230 >>> s = b'.foo/aux.txt/txt.aux/con/prn/nul/foo.'
231 231 >>> _auxencode(s.split(b'/'), True)
232 232 ['~2efoo', 'au~78.txt', 'txt.aux', 'co~6e', 'pr~6e', 'nu~6c', 'foo~2e']
233 233 >>> s = b'.com1com2/lpt9.lpt4.lpt1/conprn/com0/lpt0/foo.'
234 234 >>> _auxencode(s.split(b'/'), False)
235 235 ['.com1com2', 'lp~749.lpt4.lpt1', 'conprn', 'com0', 'lpt0', 'foo~2e']
236 236 >>> _auxencode([b'foo. '], True)
237 237 ['foo.~20']
238 238 >>> _auxencode([b' .foo'], True)
239 239 ['~20.foo']
240 240 """
241 241 for i, n in enumerate(path):
242 242 if not n:
243 243 continue
244 244 if dotencode and n[0] in b'. ':
245 245 n = b"~%02x" % ord(n[0:1]) + n[1:]
246 246 path[i] = n
247 247 else:
248 248 l = n.find(b'.')
249 249 if l == -1:
250 250 l = len(n)
251 251 if (l == 3 and n[:3] in _winres3) or (
252 252 l == 4
253 253 and n[3:4] <= b'9'
254 254 and n[3:4] >= b'1'
255 255 and n[:3] in _winres4
256 256 ):
257 257 # encode third letter ('aux' -> 'au~78')
258 258 ec = b"~%02x" % ord(n[2:3])
259 259 n = n[0:2] + ec + n[3:]
260 260 path[i] = n
261 261 if n[-1] in b'. ':
262 262 # encode last period or space ('foo...' -> 'foo..~2e')
263 263 path[i] = n[:-1] + b"~%02x" % ord(n[-1:])
264 264 return path
265 265
266 266
267 267 _maxstorepathlen = 120
268 268 _dirprefixlen = 8
269 269 _maxshortdirslen = 8 * (_dirprefixlen + 1) - 4
270 270
271 271
272 272 def _hashencode(path, dotencode):
273 273 digest = hex(hashutil.sha1(path).digest())
274 274 le = lowerencode(path[5:]).split(b'/') # skips prefix 'data/' or 'meta/'
275 275 parts = _auxencode(le, dotencode)
276 276 basename = parts[-1]
277 277 _root, ext = os.path.splitext(basename)
278 278 sdirs = []
279 279 sdirslen = 0
280 280 for p in parts[:-1]:
281 281 d = p[:_dirprefixlen]
282 282 if d[-1] in b'. ':
283 283 # Windows can't access dirs ending in period or space
284 284 d = d[:-1] + b'_'
285 285 if sdirslen == 0:
286 286 t = len(d)
287 287 else:
288 288 t = sdirslen + 1 + len(d)
289 289 if t > _maxshortdirslen:
290 290 break
291 291 sdirs.append(d)
292 292 sdirslen = t
293 293 dirs = b'/'.join(sdirs)
294 294 if len(dirs) > 0:
295 295 dirs += b'/'
296 296 res = b'dh/' + dirs + digest + ext
297 297 spaceleft = _maxstorepathlen - len(res)
298 298 if spaceleft > 0:
299 299 filler = basename[:spaceleft]
300 300 res = b'dh/' + dirs + filler + digest + ext
301 301 return res
302 302
303 303
304 304 def _hybridencode(path, dotencode):
305 305 """encodes path with a length limit
306 306
307 307 Encodes all paths that begin with 'data/', according to the following.
308 308
309 309 Default encoding (reversible):
310 310
311 311 Encodes all uppercase letters 'X' as '_x'. All reserved or illegal
312 312 characters are encoded as '~xx', where xx is the two digit hex code
313 313 of the character (see encodefilename).
314 314 Relevant path components consisting of Windows reserved filenames are
315 315 masked by encoding the third character ('aux' -> 'au~78', see _auxencode).
316 316
317 317 Hashed encoding (not reversible):
318 318
319 319 If the default-encoded path is longer than _maxstorepathlen, a
320 320 non-reversible hybrid hashing of the path is done instead.
321 321 This encoding uses up to _dirprefixlen characters of all directory
322 322 levels of the lowerencoded path, but not more levels than can fit into
323 323 _maxshortdirslen.
324 324 Then follows the filler followed by the sha digest of the full path.
325 325 The filler is the beginning of the basename of the lowerencoded path
326 326 (the basename is everything after the last path separator). The filler
327 327 is as long as possible, filling in characters from the basename until
328 328 the encoded path has _maxstorepathlen characters (or all chars of the
329 329 basename have been taken).
330 330 The extension (e.g. '.i' or '.d') is preserved.
331 331
332 332 The string 'data/' at the beginning is replaced with 'dh/', if the hashed
333 333 encoding was used.
334 334 """
335 335 path = encodedir(path)
336 336 ef = _encodefname(path).split(b'/')
337 337 res = b'/'.join(_auxencode(ef, dotencode))
338 338 if len(res) > _maxstorepathlen:
339 339 res = _hashencode(path, dotencode)
340 340 return res
341 341
342 342
343 343 def _pathencode(path):
344 344 de = encodedir(path)
345 345 if len(path) > _maxstorepathlen:
346 346 return _hashencode(de, True)
347 347 ef = _encodefname(de).split(b'/')
348 348 res = b'/'.join(_auxencode(ef, True))
349 349 if len(res) > _maxstorepathlen:
350 350 return _hashencode(de, True)
351 351 return res
352 352
353 353
354 354 _pathencode = getattr(parsers, 'pathencode', _pathencode)
355 355
356 356
357 357 def _plainhybridencode(f):
358 358 return _hybridencode(f, False)
359 359
360 360
361 361 def _calcmode(vfs):
362 362 try:
363 363 # files in .hg/ will be created using this mode
364 364 mode = vfs.stat().st_mode
365 365 # avoid some useless chmods
366 366 if (0o777 & ~util.umask) == (0o777 & mode):
367 367 mode = None
368 368 except OSError:
369 369 mode = None
370 370 return mode
371 371
372 372
373 373 _data = [
374 374 b'bookmarks',
375 375 b'narrowspec',
376 376 b'data',
377 377 b'meta',
378 378 b'00manifest.d',
379 379 b'00manifest.i',
380 380 b'00changelog.d',
381 381 b'00changelog.i',
382 382 b'phaseroots',
383 383 b'obsstore',
384 384 b'requires',
385 385 ]
386 386
387 387 REVLOG_FILES_MAIN_EXT = (b'.i',)
388 388 REVLOG_FILES_OTHER_EXT = (
389 389 b'.idx',
390 390 b'.d',
391 391 b'.dat',
392 392 b'.n',
393 393 b'.nd',
394 394 b'.sda',
395 395 )
396 396 # file extension that also use a `-SOMELONGIDHASH.ext` form
397 397 REVLOG_FILES_LONG_EXT = (
398 398 b'.nd',
399 399 b'.idx',
400 400 b'.dat',
401 401 b'.sda',
402 402 )
403 403 # files that are "volatile" and might change between listing and streaming
404 404 #
405 405 # note: the ".nd" file are nodemap data and won't "change" but they might be
406 406 # deleted.
407 407 REVLOG_FILES_VOLATILE_EXT = (b'.n', b'.nd')
408 408
409 409 # some exception to the above matching
410 410 #
411 411 # XXX This is currently not in use because of issue6542
412 412 EXCLUDED = re.compile(br'.*undo\.[^/]+\.(nd?|i)$')
413 413
414 414
415 415 def is_revlog(f, kind, st):
416 416 if kind != stat.S_IFREG:
417 417 return None
418 418 return revlog_type(f)
419 419
420 420
421 421 def revlog_type(f):
422 422 # XXX we need to filter `undo.` created by the transaction here, however
423 423 # being naive about it also filter revlog for `undo.*` files, leading to
424 424 # issue6542. So we no longer use EXCLUDED.
425 425 if f.endswith(REVLOG_FILES_MAIN_EXT):
426 426 return FILEFLAGS_REVLOG_MAIN
427 427 elif f.endswith(REVLOG_FILES_OTHER_EXT):
428 428 t = FILETYPE_FILELOG_OTHER
429 429 if f.endswith(REVLOG_FILES_VOLATILE_EXT):
430 430 t |= FILEFLAGS_VOLATILE
431 431 return t
432 432 return None
433 433
434 434
435 435 # the file is part of changelog data
436 436 FILEFLAGS_CHANGELOG = 1 << 13
437 437 # the file is part of manifest data
438 438 FILEFLAGS_MANIFESTLOG = 1 << 12
439 439 # the file is part of filelog data
440 440 FILEFLAGS_FILELOG = 1 << 11
441 441 # file that are not directly part of a revlog
442 442 FILEFLAGS_OTHER = 1 << 10
443 443
444 444 # the main entry point for a revlog
445 445 FILEFLAGS_REVLOG_MAIN = 1 << 1
446 446 # a secondary file for a revlog
447 447 FILEFLAGS_REVLOG_OTHER = 1 << 0
448 448
449 449 # files that are "volatile" and might change between listing and streaming
450 450 FILEFLAGS_VOLATILE = 1 << 20
451 451
452 452 FILETYPE_CHANGELOG_MAIN = FILEFLAGS_CHANGELOG | FILEFLAGS_REVLOG_MAIN
453 453 FILETYPE_CHANGELOG_OTHER = FILEFLAGS_CHANGELOG | FILEFLAGS_REVLOG_OTHER
454 454 FILETYPE_MANIFESTLOG_MAIN = FILEFLAGS_MANIFESTLOG | FILEFLAGS_REVLOG_MAIN
455 455 FILETYPE_MANIFESTLOG_OTHER = FILEFLAGS_MANIFESTLOG | FILEFLAGS_REVLOG_OTHER
456 456 FILETYPE_FILELOG_MAIN = FILEFLAGS_FILELOG | FILEFLAGS_REVLOG_MAIN
457 457 FILETYPE_FILELOG_OTHER = FILEFLAGS_FILELOG | FILEFLAGS_REVLOG_OTHER
458 458 FILETYPE_OTHER = FILEFLAGS_OTHER
459 459
460 460
461 461 @attr.s(slots=True, init=False)
462 462 class BaseStoreEntry:
463 463 """An entry in the store
464 464
465 465 This is returned by `store.walk` and represent some data in the store."""
466 466
467 467
468 468 @attr.s(slots=True, init=False)
469 469 class SimpleStoreEntry(BaseStoreEntry):
470 470 """A generic entry in the store"""
471 471
472 472 is_revlog = False
473 473
474 474 _entry_path = attr.ib()
475 475 _is_volatile = attr.ib(default=False)
476 476 _file_size = attr.ib(default=None)
477 477
478 478 def __init__(
479 479 self,
480 480 entry_path,
481 481 is_volatile=False,
482 482 file_size=None,
483 483 ):
484 484 super().__init__()
485 485 self._entry_path = entry_path
486 486 self._is_volatile = is_volatile
487 487 self._file_size = file_size
488 488
489 489 def files(self):
490 490 return [
491 491 StoreFile(
492 492 unencoded_path=self._entry_path,
493 493 file_size=self._file_size,
494 494 is_volatile=self._is_volatile,
495 495 )
496 496 ]
497 497
498 498
499 499 @attr.s(slots=True, init=False)
500 500 class RevlogStoreEntry(BaseStoreEntry):
501 501 """A revlog entry in the store"""
502 502
503 503 is_revlog = True
504 504
505 505 revlog_type = attr.ib(default=None)
506 506 target_id = attr.ib(default=None)
507 507 _path_prefix = attr.ib(default=None)
508 508 _details = attr.ib(default=None)
509 509
510 510 def __init__(
511 511 self,
512 512 revlog_type,
513 513 path_prefix,
514 514 target_id,
515 515 details,
516 516 ):
517 517 super().__init__()
518 518 self.revlog_type = revlog_type
519 519 self.target_id = target_id
520 520 self._path_prefix = path_prefix
521 521 assert b'.i' in details, (path_prefix, details)
522 522 self._details = details
523 523
524 524 @property
525 525 def is_changelog(self):
526 526 return self.revlog_type & FILEFLAGS_CHANGELOG
527 527
528 528 @property
529 529 def is_manifestlog(self):
530 530 return self.revlog_type & FILEFLAGS_MANIFESTLOG
531 531
532 532 @property
533 533 def is_filelog(self):
534 534 return self.revlog_type & FILEFLAGS_FILELOG
535 535
536 536 def main_file_path(self):
537 537 """unencoded path of the main revlog file"""
538 538 return self._path_prefix + b'.i'
539 539
540 540 def files(self):
541 541 files = []
542 542 for ext in sorted(self._details, key=_ext_key):
543 543 path = self._path_prefix + ext
544 544 data = self._details[ext]
545 545 files.append(StoreFile(unencoded_path=path, **data))
546 546 return files
547 547
548 548
549 549 @attr.s(slots=True)
550 550 class StoreFile:
551 551 """a file matching an entry"""
552 552
553 553 unencoded_path = attr.ib()
554 554 _file_size = attr.ib(default=None)
555 555 is_volatile = attr.ib(default=False)
556 556
557 557 def file_size(self, vfs):
558 558 if self._file_size is not None:
559 559 return self._file_size
560 560 try:
561 561 return vfs.stat(self.unencoded_path).st_size
562 562 except FileNotFoundError:
563 563 return 0
564 564
565 565
566 566 def _gather_revlog(files_data):
567 567 """group files per revlog prefix
568 568
569 569 The returns a two level nested dict. The top level key is the revlog prefix
570 570 without extension, the second level is all the file "suffix" that were
571 571 seen for this revlog and arbitrary file data as value.
572 572 """
573 573 revlogs = collections.defaultdict(dict)
574 574 for u, value in files_data:
575 575 name, ext = _split_revlog_ext(u)
576 576 revlogs[name][ext] = value
577 577 return sorted(revlogs.items())
578 578
579 579
580 580 def _split_revlog_ext(filename):
581 581 """split the revlog file prefix from the variable extension"""
582 582 if filename.endswith(REVLOG_FILES_LONG_EXT):
583 583 char = b'-'
584 584 else:
585 585 char = b'.'
586 586 idx = filename.rfind(char)
587 587 return filename[:idx], filename[idx:]
588 588
589 589
590 590 def _ext_key(ext):
591 591 """a key to order revlog suffix
592 592
593 593 important to issue .i after other entry."""
594 594 # the only important part of this order is to keep the `.i` last.
595 595 if ext.endswith(b'.n'):
596 596 return (0, ext)
597 597 elif ext.endswith(b'.nd'):
598 598 return (10, ext)
599 599 elif ext.endswith(b'.d'):
600 600 return (20, ext)
601 601 elif ext.endswith(b'.i'):
602 602 return (50, ext)
603 603 else:
604 604 return (40, ext)
605 605
606 606
607 607 class basicstore:
608 608 '''base class for local repository stores'''
609 609
610 610 def __init__(self, path, vfstype):
611 611 vfs = vfstype(path)
612 612 self.path = vfs.base
613 613 self.createmode = _calcmode(vfs)
614 614 vfs.createmode = self.createmode
615 615 self.rawvfs = vfs
616 616 self.vfs = vfsmod.filtervfs(vfs, encodedir)
617 617 self.opener = self.vfs
618 618
619 619 def join(self, f):
620 620 return self.path + b'/' + encodedir(f)
621 621
622 622 def _walk(self, relpath, recurse, undecodable=None):
623 623 '''yields (revlog_type, unencoded, size)'''
624 624 path = self.path
625 625 if relpath:
626 626 path += b'/' + relpath
627 627 striplen = len(self.path) + 1
628 628 l = []
629 629 if self.rawvfs.isdir(path):
630 630 visit = [path]
631 631 readdir = self.rawvfs.readdir
632 632 while visit:
633 633 p = visit.pop()
634 634 for f, kind, st in readdir(p, stat=True):
635 635 fp = p + b'/' + f
636 636 rl_type = is_revlog(f, kind, st)
637 637 if rl_type is not None:
638 638 n = util.pconvert(fp[striplen:])
639 639 l.append((decodedir(n), (rl_type, st.st_size)))
640 640 elif kind == stat.S_IFDIR and recurse:
641 641 visit.append(fp)
642 642
643 643 l.sort()
644 644 return l
645 645
646 646 def changelog(self, trypending, concurrencychecker=None):
647 647 return changelog.changelog(
648 648 self.vfs,
649 649 trypending=trypending,
650 650 concurrencychecker=concurrencychecker,
651 651 )
652 652
653 653 def manifestlog(self, repo, storenarrowmatch):
654 654 rootstore = manifest.manifestrevlog(repo.nodeconstants, self.vfs)
655 655 return manifest.manifestlog(self.vfs, repo, rootstore, storenarrowmatch)
656 656
657 657 def data_entries(
658 658 self, matcher=None, undecodable=None
659 659 ) -> Generator[BaseStoreEntry, None, None]:
660 660 """Like walk, but excluding the changelog and root manifest.
661 661
662 662 When [undecodable] is None, revlogs names that can't be
663 663 decoded cause an exception. When it is provided, it should
664 664 be a list and the filenames that can't be decoded are added
665 665 to it instead. This is very rarely needed."""
666 666 dirs = [
667 667 (b'data', FILEFLAGS_FILELOG),
668 668 (b'meta', FILEFLAGS_MANIFESTLOG),
669 669 ]
670 670 for base_dir, rl_type in dirs:
671 671 files = self._walk(base_dir, True, undecodable=undecodable)
672 672 files = (f for f in files if f[1][0] is not None)
673 673 for revlog, details in _gather_revlog(files):
674 674 file_details = {}
675 675 revlog_target_id = revlog.split(b'/', 1)[1]
676 676 for ext, (t, s) in sorted(details.items()):
677 677 file_details[ext] = {
678 678 'is_volatile': bool(t & FILEFLAGS_VOLATILE),
679 679 'file_size': s,
680 680 }
681 681 yield RevlogStoreEntry(
682 682 path_prefix=revlog,
683 683 revlog_type=rl_type,
684 684 target_id=revlog_target_id,
685 685 details=file_details,
686 686 )
687 687
688 def top_entries(self, phase=False) -> Generator[BaseStoreEntry, None, None]:
688 def top_entries(
689 self, phase=False, obsolescence=False
690 ) -> Generator[BaseStoreEntry, None, None]:
689 691 if phase and self.vfs.exists(b'phaseroots'):
690 692 yield SimpleStoreEntry(
691 693 entry_path=b'phaseroots',
692 694 is_volatile=True,
693 695 )
694 696
697 if obsolescence and self.vfs.exists(b'obsstore'):
698 # XXX if we had the file size it could be non-volatile
699 yield SimpleStoreEntry(
700 entry_path=b'obsstore',
701 is_volatile=True,
702 )
703
695 704 files = reversed(self._walk(b'', False))
696 705
697 706 changelogs = collections.defaultdict(dict)
698 707 manifestlogs = collections.defaultdict(dict)
699 708
700 709 for u, (t, s) in files:
701 710 if u.startswith(b'00changelog'):
702 711 name, ext = _split_revlog_ext(u)
703 712 changelogs[name][ext] = (t, s)
704 713 elif u.startswith(b'00manifest'):
705 714 name, ext = _split_revlog_ext(u)
706 715 manifestlogs[name][ext] = (t, s)
707 716 else:
708 717 yield SimpleStoreEntry(
709 718 entry_path=u,
710 719 is_volatile=bool(t & FILEFLAGS_VOLATILE),
711 720 file_size=s,
712 721 )
713 722 # yield manifest before changelog
714 723 top_rl = [
715 724 (manifestlogs, FILEFLAGS_MANIFESTLOG),
716 725 (changelogs, FILEFLAGS_CHANGELOG),
717 726 ]
718 727 assert len(manifestlogs) <= 1
719 728 assert len(changelogs) <= 1
720 729 for data, revlog_type in top_rl:
721 730 for revlog, details in sorted(data.items()):
722 731 file_details = {}
723 732 for ext, (t, s) in details.items():
724 733 file_details[ext] = {
725 734 'is_volatile': bool(t & FILEFLAGS_VOLATILE),
726 735 'file_size': s,
727 736 }
728 737 yield RevlogStoreEntry(
729 738 path_prefix=revlog,
730 739 revlog_type=revlog_type,
731 740 target_id=b'',
732 741 details=file_details,
733 742 )
734 743
735 744 def walk(
736 self, matcher=None, phase=False
745 self, matcher=None, phase=False, obsolescence=False
737 746 ) -> Generator[BaseStoreEntry, None, None]:
738 747 """return files related to data storage (ie: revlogs)
739 748
740 749 yields instance from BaseStoreEntry subclasses
741 750
742 751 if a matcher is passed, storage files of only those tracked paths
743 752 are passed with matches the matcher
744 753 """
745 754 # yield data files first
746 755 for x in self.data_entries(matcher):
747 756 yield x
748 for x in self.top_entries(phase=phase):
757 for x in self.top_entries(phase=phase, obsolescence=obsolescence):
749 758 yield x
750 759
751 760 def copylist(self):
752 761 return _data
753 762
754 763 def write(self, tr):
755 764 pass
756 765
757 766 def invalidatecaches(self):
758 767 pass
759 768
760 769 def markremoved(self, fn):
761 770 pass
762 771
763 772 def __contains__(self, path):
764 773 '''Checks if the store contains path'''
765 774 path = b"/".join((b"data", path))
766 775 # file?
767 776 if self.vfs.exists(path + b".i"):
768 777 return True
769 778 # dir?
770 779 if not path.endswith(b"/"):
771 780 path = path + b"/"
772 781 return self.vfs.exists(path)
773 782
774 783
775 784 class encodedstore(basicstore):
776 785 def __init__(self, path, vfstype):
777 786 vfs = vfstype(path + b'/store')
778 787 self.path = vfs.base
779 788 self.createmode = _calcmode(vfs)
780 789 vfs.createmode = self.createmode
781 790 self.rawvfs = vfs
782 791 self.vfs = vfsmod.filtervfs(vfs, encodefilename)
783 792 self.opener = self.vfs
784 793
785 794 def _walk(self, relpath, recurse, undecodable=None):
786 795 old = super()._walk(relpath, recurse)
787 796 new = []
788 797 for f1, value in old:
789 798 try:
790 799 f2 = decodefilename(f1)
791 800 except KeyError:
792 801 if undecodable is None:
793 802 msg = _(b'undecodable revlog name %s') % f1
794 803 raise error.StorageError(msg)
795 804 else:
796 805 undecodable.append(f1)
797 806 continue
798 807 new.append((f2, value))
799 808 return new
800 809
801 810 def data_entries(
802 811 self, matcher=None, undecodable=None
803 812 ) -> Generator[BaseStoreEntry, None, None]:
804 813 entries = super(encodedstore, self).data_entries(
805 814 undecodable=undecodable
806 815 )
807 816 for entry in entries:
808 817 if _match_tracked_entry(entry, matcher):
809 818 yield entry
810 819
811 820 def join(self, f):
812 821 return self.path + b'/' + encodefilename(f)
813 822
814 823 def copylist(self):
815 824 return [b'requires', b'00changelog.i'] + [b'store/' + f for f in _data]
816 825
817 826
818 827 class fncache:
819 828 # the filename used to be partially encoded
820 829 # hence the encodedir/decodedir dance
821 830 def __init__(self, vfs):
822 831 self.vfs = vfs
823 832 self._ignores = set()
824 833 self.entries = None
825 834 self._dirty = False
826 835 # set of new additions to fncache
827 836 self.addls = set()
828 837
829 838 def ensureloaded(self, warn=None):
830 839 """read the fncache file if not already read.
831 840
832 841 If the file on disk is corrupted, raise. If warn is provided,
833 842 warn and keep going instead."""
834 843 if self.entries is None:
835 844 self._load(warn)
836 845
837 846 def _load(self, warn=None):
838 847 '''fill the entries from the fncache file'''
839 848 self._dirty = False
840 849 try:
841 850 fp = self.vfs(b'fncache', mode=b'rb')
842 851 except IOError:
843 852 # skip nonexistent file
844 853 self.entries = set()
845 854 return
846 855
847 856 self.entries = set()
848 857 chunk = b''
849 858 for c in iter(functools.partial(fp.read, fncache_chunksize), b''):
850 859 chunk += c
851 860 try:
852 861 p = chunk.rindex(b'\n')
853 862 self.entries.update(decodedir(chunk[: p + 1]).splitlines())
854 863 chunk = chunk[p + 1 :]
855 864 except ValueError:
856 865 # substring '\n' not found, maybe the entry is bigger than the
857 866 # chunksize, so let's keep iterating
858 867 pass
859 868
860 869 if chunk:
861 870 msg = _(b"fncache does not ends with a newline")
862 871 if warn:
863 872 warn(msg + b'\n')
864 873 else:
865 874 raise error.Abort(
866 875 msg,
867 876 hint=_(
868 877 b"use 'hg debugrebuildfncache' to "
869 878 b"rebuild the fncache"
870 879 ),
871 880 )
872 881 self._checkentries(fp, warn)
873 882 fp.close()
874 883
875 884 def _checkentries(self, fp, warn):
876 885 """make sure there is no empty string in entries"""
877 886 if b'' in self.entries:
878 887 fp.seek(0)
879 888 for n, line in enumerate(fp):
880 889 if not line.rstrip(b'\n'):
881 890 t = _(b'invalid entry in fncache, line %d') % (n + 1)
882 891 if warn:
883 892 warn(t + b'\n')
884 893 else:
885 894 raise error.Abort(t)
886 895
887 896 def write(self, tr):
888 897 if self._dirty:
889 898 assert self.entries is not None
890 899 self.entries = self.entries | self.addls
891 900 self.addls = set()
892 901 tr.addbackup(b'fncache')
893 902 fp = self.vfs(b'fncache', mode=b'wb', atomictemp=True)
894 903 if self.entries:
895 904 fp.write(encodedir(b'\n'.join(self.entries) + b'\n'))
896 905 fp.close()
897 906 self._dirty = False
898 907 if self.addls:
899 908 # if we have just new entries, let's append them to the fncache
900 909 tr.addbackup(b'fncache')
901 910 fp = self.vfs(b'fncache', mode=b'ab', atomictemp=True)
902 911 if self.addls:
903 912 fp.write(encodedir(b'\n'.join(self.addls) + b'\n'))
904 913 fp.close()
905 914 self.entries = None
906 915 self.addls = set()
907 916
908 917 def addignore(self, fn):
909 918 self._ignores.add(fn)
910 919
911 920 def add(self, fn):
912 921 if fn in self._ignores:
913 922 return
914 923 if self.entries is None:
915 924 self._load()
916 925 if fn not in self.entries:
917 926 self.addls.add(fn)
918 927
919 928 def remove(self, fn):
920 929 if self.entries is None:
921 930 self._load()
922 931 if fn in self.addls:
923 932 self.addls.remove(fn)
924 933 return
925 934 try:
926 935 self.entries.remove(fn)
927 936 self._dirty = True
928 937 except KeyError:
929 938 pass
930 939
931 940 def __contains__(self, fn):
932 941 if fn in self.addls:
933 942 return True
934 943 if self.entries is None:
935 944 self._load()
936 945 return fn in self.entries
937 946
938 947 def __iter__(self):
939 948 if self.entries is None:
940 949 self._load()
941 950 return iter(self.entries | self.addls)
942 951
943 952
944 953 class _fncachevfs(vfsmod.proxyvfs):
945 954 def __init__(self, vfs, fnc, encode):
946 955 vfsmod.proxyvfs.__init__(self, vfs)
947 956 self.fncache = fnc
948 957 self.encode = encode
949 958
950 959 def __call__(self, path, mode=b'r', *args, **kw):
951 960 encoded = self.encode(path)
952 961 if (
953 962 mode not in (b'r', b'rb')
954 963 and (path.startswith(b'data/') or path.startswith(b'meta/'))
955 964 and revlog_type(path) is not None
956 965 ):
957 966 # do not trigger a fncache load when adding a file that already is
958 967 # known to exist.
959 968 notload = self.fncache.entries is None and self.vfs.exists(encoded)
960 969 if notload and b'r+' in mode and not self.vfs.stat(encoded).st_size:
961 970 # when appending to an existing file, if the file has size zero,
962 971 # it should be considered as missing. Such zero-size files are
963 972 # the result of truncation when a transaction is aborted.
964 973 notload = False
965 974 if not notload:
966 975 self.fncache.add(path)
967 976 return self.vfs(encoded, mode, *args, **kw)
968 977
969 978 def join(self, path):
970 979 if path:
971 980 return self.vfs.join(self.encode(path))
972 981 else:
973 982 return self.vfs.join(path)
974 983
975 984 def register_file(self, path):
976 985 """generic hook point to lets fncache steer its stew"""
977 986 if path.startswith(b'data/') or path.startswith(b'meta/'):
978 987 self.fncache.add(path)
979 988
980 989
981 990 class fncachestore(basicstore):
982 991 def __init__(self, path, vfstype, dotencode):
983 992 if dotencode:
984 993 encode = _pathencode
985 994 else:
986 995 encode = _plainhybridencode
987 996 self.encode = encode
988 997 vfs = vfstype(path + b'/store')
989 998 self.path = vfs.base
990 999 self.pathsep = self.path + b'/'
991 1000 self.createmode = _calcmode(vfs)
992 1001 vfs.createmode = self.createmode
993 1002 self.rawvfs = vfs
994 1003 fnc = fncache(vfs)
995 1004 self.fncache = fnc
996 1005 self.vfs = _fncachevfs(vfs, fnc, encode)
997 1006 self.opener = self.vfs
998 1007
999 1008 def join(self, f):
1000 1009 return self.pathsep + self.encode(f)
1001 1010
1002 1011 def getsize(self, path):
1003 1012 return self.rawvfs.stat(path).st_size
1004 1013
1005 1014 def data_entries(
1006 1015 self, matcher=None, undecodable=None
1007 1016 ) -> Generator[BaseStoreEntry, None, None]:
1008 1017 files = ((f, revlog_type(f)) for f in self.fncache)
1009 1018 # Note: all files in fncache should be revlog related, However the
1010 1019 # fncache might contains such file added by previous version of
1011 1020 # Mercurial.
1012 1021 files = (f for f in files if f[1] is not None)
1013 1022 by_revlog = _gather_revlog(files)
1014 1023 for revlog, details in by_revlog:
1015 1024 file_details = {}
1016 1025 if revlog.startswith(b'data/'):
1017 1026 rl_type = FILEFLAGS_FILELOG
1018 1027 revlog_target_id = revlog.split(b'/', 1)[1]
1019 1028 elif revlog.startswith(b'meta/'):
1020 1029 rl_type = FILEFLAGS_MANIFESTLOG
1021 1030 # drop the initial directory and the `00manifest` file part
1022 1031 tmp = revlog.split(b'/', 1)[1]
1023 1032 revlog_target_id = tmp.rsplit(b'/', 1)[0] + b'/'
1024 1033 else:
1025 1034 # unreachable
1026 1035 assert False, revlog
1027 1036 for ext, t in details.items():
1028 1037 file_details[ext] = {
1029 1038 'is_volatile': bool(t & FILEFLAGS_VOLATILE),
1030 1039 }
1031 1040 entry = RevlogStoreEntry(
1032 1041 path_prefix=revlog,
1033 1042 revlog_type=rl_type,
1034 1043 target_id=revlog_target_id,
1035 1044 details=file_details,
1036 1045 )
1037 1046 if _match_tracked_entry(entry, matcher):
1038 1047 yield entry
1039 1048
1040 1049 def copylist(self):
1041 1050 d = (
1042 1051 b'bookmarks',
1043 1052 b'narrowspec',
1044 1053 b'data',
1045 1054 b'meta',
1046 1055 b'dh',
1047 1056 b'fncache',
1048 1057 b'phaseroots',
1049 1058 b'obsstore',
1050 1059 b'00manifest.d',
1051 1060 b'00manifest.i',
1052 1061 b'00changelog.d',
1053 1062 b'00changelog.i',
1054 1063 b'requires',
1055 1064 )
1056 1065 return [b'requires', b'00changelog.i'] + [b'store/' + f for f in d]
1057 1066
1058 1067 def write(self, tr):
1059 1068 self.fncache.write(tr)
1060 1069
1061 1070 def invalidatecaches(self):
1062 1071 self.fncache.entries = None
1063 1072 self.fncache.addls = set()
1064 1073
1065 1074 def markremoved(self, fn):
1066 1075 self.fncache.remove(fn)
1067 1076
1068 1077 def _exists(self, f):
1069 1078 ef = self.encode(f)
1070 1079 try:
1071 1080 self.getsize(ef)
1072 1081 return True
1073 1082 except FileNotFoundError:
1074 1083 return False
1075 1084
1076 1085 def __contains__(self, path):
1077 1086 '''Checks if the store contains path'''
1078 1087 path = b"/".join((b"data", path))
1079 1088 # check for files (exact match)
1080 1089 e = path + b'.i'
1081 1090 if e in self.fncache and self._exists(e):
1082 1091 return True
1083 1092 # now check for directories (prefix match)
1084 1093 if not path.endswith(b'/'):
1085 1094 path += b'/'
1086 1095 for e in self.fncache:
1087 1096 if e.startswith(path) and self._exists(e):
1088 1097 return True
1089 1098 return False
@@ -1,936 +1,936 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import contextlib
10 10 import os
11 11 import struct
12 12
13 13 from .i18n import _
14 14 from .pycompat import open
15 15 from .interfaces import repository
16 16 from . import (
17 17 bookmarks,
18 18 cacheutil,
19 19 error,
20 20 narrowspec,
21 21 phases,
22 22 pycompat,
23 23 requirements as requirementsmod,
24 24 scmutil,
25 25 store,
26 26 transaction,
27 27 util,
28 28 )
29 29 from .revlogutils import (
30 30 nodemap,
31 31 )
32 32
33 33
34 34 def new_stream_clone_requirements(default_requirements, streamed_requirements):
35 35 """determine the final set of requirement for a new stream clone
36 36
37 37 this method combine the "default" requirements that a new repository would
38 38 use with the constaint we get from the stream clone content. We keep local
39 39 configuration choice when possible.
40 40 """
41 41 requirements = set(default_requirements)
42 42 requirements -= requirementsmod.STREAM_FIXED_REQUIREMENTS
43 43 requirements.update(streamed_requirements)
44 44 return requirements
45 45
46 46
47 47 def streamed_requirements(repo):
48 48 """the set of requirement the new clone will have to support
49 49
50 50 This is used for advertising the stream options and to generate the actual
51 51 stream content."""
52 52 requiredformats = (
53 53 repo.requirements & requirementsmod.STREAM_FIXED_REQUIREMENTS
54 54 )
55 55 return requiredformats
56 56
57 57
58 58 def canperformstreamclone(pullop, bundle2=False):
59 59 """Whether it is possible to perform a streaming clone as part of pull.
60 60
61 61 ``bundle2`` will cause the function to consider stream clone through
62 62 bundle2 and only through bundle2.
63 63
64 64 Returns a tuple of (supported, requirements). ``supported`` is True if
65 65 streaming clone is supported and False otherwise. ``requirements`` is
66 66 a set of repo requirements from the remote, or ``None`` if stream clone
67 67 isn't supported.
68 68 """
69 69 repo = pullop.repo
70 70 remote = pullop.remote
71 71
72 72 bundle2supported = False
73 73 if pullop.canusebundle2:
74 74 if b'v2' in pullop.remotebundle2caps.get(b'stream', []):
75 75 bundle2supported = True
76 76 # else
77 77 # Server doesn't support bundle2 stream clone or doesn't support
78 78 # the versions we support. Fall back and possibly allow legacy.
79 79
80 80 # Ensures legacy code path uses available bundle2.
81 81 if bundle2supported and not bundle2:
82 82 return False, None
83 83 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
84 84 elif bundle2 and not bundle2supported:
85 85 return False, None
86 86
87 87 # Streaming clone only works on empty repositories.
88 88 if len(repo):
89 89 return False, None
90 90
91 91 # Streaming clone only works if all data is being requested.
92 92 if pullop.heads:
93 93 return False, None
94 94
95 95 streamrequested = pullop.streamclonerequested
96 96
97 97 # If we don't have a preference, let the server decide for us. This
98 98 # likely only comes into play in LANs.
99 99 if streamrequested is None:
100 100 # The server can advertise whether to prefer streaming clone.
101 101 streamrequested = remote.capable(b'stream-preferred')
102 102
103 103 if not streamrequested:
104 104 return False, None
105 105
106 106 # In order for stream clone to work, the client has to support all the
107 107 # requirements advertised by the server.
108 108 #
109 109 # The server advertises its requirements via the "stream" and "streamreqs"
110 110 # capability. "stream" (a value-less capability) is advertised if and only
111 111 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
112 112 # is advertised and contains a comma-delimited list of requirements.
113 113 requirements = set()
114 114 if remote.capable(b'stream'):
115 115 requirements.add(requirementsmod.REVLOGV1_REQUIREMENT)
116 116 else:
117 117 streamreqs = remote.capable(b'streamreqs')
118 118 # This is weird and shouldn't happen with modern servers.
119 119 if not streamreqs:
120 120 pullop.repo.ui.warn(
121 121 _(
122 122 b'warning: stream clone requested but server has them '
123 123 b'disabled\n'
124 124 )
125 125 )
126 126 return False, None
127 127
128 128 streamreqs = set(streamreqs.split(b','))
129 129 # Server requires something we don't support. Bail.
130 130 missingreqs = streamreqs - repo.supported
131 131 if missingreqs:
132 132 pullop.repo.ui.warn(
133 133 _(
134 134 b'warning: stream clone requested but client is missing '
135 135 b'requirements: %s\n'
136 136 )
137 137 % b', '.join(sorted(missingreqs))
138 138 )
139 139 pullop.repo.ui.warn(
140 140 _(
141 141 b'(see https://www.mercurial-scm.org/wiki/MissingRequirement '
142 142 b'for more information)\n'
143 143 )
144 144 )
145 145 return False, None
146 146 requirements = streamreqs
147 147
148 148 return True, requirements
149 149
150 150
151 151 def maybeperformlegacystreamclone(pullop):
152 152 """Possibly perform a legacy stream clone operation.
153 153
154 154 Legacy stream clones are performed as part of pull but before all other
155 155 operations.
156 156
157 157 A legacy stream clone will not be performed if a bundle2 stream clone is
158 158 supported.
159 159 """
160 160 from . import localrepo
161 161
162 162 supported, requirements = canperformstreamclone(pullop)
163 163
164 164 if not supported:
165 165 return
166 166
167 167 repo = pullop.repo
168 168 remote = pullop.remote
169 169
170 170 # Save remote branchmap. We will use it later to speed up branchcache
171 171 # creation.
172 172 rbranchmap = None
173 173 if remote.capable(b'branchmap'):
174 174 with remote.commandexecutor() as e:
175 175 rbranchmap = e.callcommand(b'branchmap', {}).result()
176 176
177 177 repo.ui.status(_(b'streaming all changes\n'))
178 178
179 179 with remote.commandexecutor() as e:
180 180 fp = e.callcommand(b'stream_out', {}).result()
181 181
182 182 # TODO strictly speaking, this code should all be inside the context
183 183 # manager because the context manager is supposed to ensure all wire state
184 184 # is flushed when exiting. But the legacy peers don't do this, so it
185 185 # doesn't matter.
186 186 l = fp.readline()
187 187 try:
188 188 resp = int(l)
189 189 except ValueError:
190 190 raise error.ResponseError(
191 191 _(b'unexpected response from remote server:'), l
192 192 )
193 193 if resp == 1:
194 194 raise error.Abort(_(b'operation forbidden by server'))
195 195 elif resp == 2:
196 196 raise error.Abort(_(b'locking the remote repository failed'))
197 197 elif resp != 0:
198 198 raise error.Abort(_(b'the server sent an unknown error code'))
199 199
200 200 l = fp.readline()
201 201 try:
202 202 filecount, bytecount = map(int, l.split(b' ', 1))
203 203 except (ValueError, TypeError):
204 204 raise error.ResponseError(
205 205 _(b'unexpected response from remote server:'), l
206 206 )
207 207
208 208 with repo.lock():
209 209 consumev1(repo, fp, filecount, bytecount)
210 210 repo.requirements = new_stream_clone_requirements(
211 211 repo.requirements,
212 212 requirements,
213 213 )
214 214 repo.svfs.options = localrepo.resolvestorevfsoptions(
215 215 repo.ui, repo.requirements, repo.features
216 216 )
217 217 scmutil.writereporequirements(repo)
218 218 nodemap.post_stream_cleanup(repo)
219 219
220 220 if rbranchmap:
221 221 repo._branchcaches.replace(repo, rbranchmap)
222 222
223 223 repo.invalidate()
224 224
225 225
226 226 def allowservergeneration(repo):
227 227 """Whether streaming clones are allowed from the server."""
228 228 if repository.REPO_FEATURE_STREAM_CLONE not in repo.features:
229 229 return False
230 230
231 231 if not repo.ui.configbool(b'server', b'uncompressed', untrusted=True):
232 232 return False
233 233
234 234 # The way stream clone works makes it impossible to hide secret changesets.
235 235 # So don't allow this by default.
236 236 secret = phases.hassecret(repo)
237 237 if secret:
238 238 return repo.ui.configbool(b'server', b'uncompressedallowsecret')
239 239
240 240 return True
241 241
242 242
243 243 # This is it's own function so extensions can override it.
244 def _walkstreamfiles(repo, matcher=None, phase=False):
245 return repo.store.walk(matcher, phase=phase)
244 def _walkstreamfiles(repo, matcher=None, phase=False, obsolescence=False):
245 return repo.store.walk(matcher, phase=phase, obsolescence=obsolescence)
246 246
247 247
248 248 def generatev1(repo):
249 249 """Emit content for version 1 of a streaming clone.
250 250
251 251 This returns a 3-tuple of (file count, byte size, data iterator).
252 252
253 253 The data iterator consists of N entries for each file being transferred.
254 254 Each file entry starts as a line with the file name and integer size
255 255 delimited by a null byte.
256 256
257 257 The raw file data follows. Following the raw file data is the next file
258 258 entry, or EOF.
259 259
260 260 When used on the wire protocol, an additional line indicating protocol
261 261 success will be prepended to the stream. This function is not responsible
262 262 for adding it.
263 263
264 264 This function will obtain a repository lock to ensure a consistent view of
265 265 the store is captured. It therefore may raise LockError.
266 266 """
267 267 entries = []
268 268 total_bytes = 0
269 269 # Get consistent snapshot of repo, lock during scan.
270 270 with repo.lock():
271 271 repo.ui.debug(b'scanning\n')
272 272 for entry in _walkstreamfiles(repo):
273 273 for f in entry.files():
274 274 file_size = f.file_size(repo.store.vfs)
275 275 if file_size:
276 276 entries.append((f.unencoded_path, file_size))
277 277 total_bytes += file_size
278 278 _test_sync_point_walk_1(repo)
279 279 _test_sync_point_walk_2(repo)
280 280
281 281 repo.ui.debug(
282 282 b'%d files, %d bytes to transfer\n' % (len(entries), total_bytes)
283 283 )
284 284
285 285 svfs = repo.svfs
286 286 debugflag = repo.ui.debugflag
287 287
288 288 def emitrevlogdata():
289 289 for name, size in entries:
290 290 if debugflag:
291 291 repo.ui.debug(b'sending %s (%d bytes)\n' % (name, size))
292 292 # partially encode name over the wire for backwards compat
293 293 yield b'%s\0%d\n' % (store.encodedir(name), size)
294 294 # auditing at this stage is both pointless (paths are already
295 295 # trusted by the local repo) and expensive
296 296 with svfs(name, b'rb', auditpath=False) as fp:
297 297 if size <= 65536:
298 298 yield fp.read(size)
299 299 else:
300 300 for chunk in util.filechunkiter(fp, limit=size):
301 301 yield chunk
302 302
303 303 return len(entries), total_bytes, emitrevlogdata()
304 304
305 305
306 306 def generatev1wireproto(repo):
307 307 """Emit content for version 1 of streaming clone suitable for the wire.
308 308
309 309 This is the data output from ``generatev1()`` with 2 header lines. The
310 310 first line indicates overall success. The 2nd contains the file count and
311 311 byte size of payload.
312 312
313 313 The success line contains "0" for success, "1" for stream generation not
314 314 allowed, and "2" for error locking the repository (possibly indicating
315 315 a permissions error for the server process).
316 316 """
317 317 if not allowservergeneration(repo):
318 318 yield b'1\n'
319 319 return
320 320
321 321 try:
322 322 filecount, bytecount, it = generatev1(repo)
323 323 except error.LockError:
324 324 yield b'2\n'
325 325 return
326 326
327 327 # Indicates successful response.
328 328 yield b'0\n'
329 329 yield b'%d %d\n' % (filecount, bytecount)
330 330 for chunk in it:
331 331 yield chunk
332 332
333 333
334 334 def generatebundlev1(repo, compression=b'UN'):
335 335 """Emit content for version 1 of a stream clone bundle.
336 336
337 337 The first 4 bytes of the output ("HGS1") denote this as stream clone
338 338 bundle version 1.
339 339
340 340 The next 2 bytes indicate the compression type. Only "UN" is currently
341 341 supported.
342 342
343 343 The next 16 bytes are two 64-bit big endian unsigned integers indicating
344 344 file count and byte count, respectively.
345 345
346 346 The next 2 bytes is a 16-bit big endian unsigned short declaring the length
347 347 of the requirements string, including a trailing \0. The following N bytes
348 348 are the requirements string, which is ASCII containing a comma-delimited
349 349 list of repo requirements that are needed to support the data.
350 350
351 351 The remaining content is the output of ``generatev1()`` (which may be
352 352 compressed in the future).
353 353
354 354 Returns a tuple of (requirements, data generator).
355 355 """
356 356 if compression != b'UN':
357 357 raise ValueError(b'we do not support the compression argument yet')
358 358
359 359 requirements = streamed_requirements(repo)
360 360 requires = b','.join(sorted(requirements))
361 361
362 362 def gen():
363 363 yield b'HGS1'
364 364 yield compression
365 365
366 366 filecount, bytecount, it = generatev1(repo)
367 367 repo.ui.status(
368 368 _(b'writing %d bytes for %d files\n') % (bytecount, filecount)
369 369 )
370 370
371 371 yield struct.pack(b'>QQ', filecount, bytecount)
372 372 yield struct.pack(b'>H', len(requires) + 1)
373 373 yield requires + b'\0'
374 374
375 375 # This is where we'll add compression in the future.
376 376 assert compression == b'UN'
377 377
378 378 progress = repo.ui.makeprogress(
379 379 _(b'bundle'), total=bytecount, unit=_(b'bytes')
380 380 )
381 381 progress.update(0)
382 382
383 383 for chunk in it:
384 384 progress.increment(step=len(chunk))
385 385 yield chunk
386 386
387 387 progress.complete()
388 388
389 389 return requirements, gen()
390 390
391 391
392 392 def consumev1(repo, fp, filecount, bytecount):
393 393 """Apply the contents from version 1 of a streaming clone file handle.
394 394
395 395 This takes the output from "stream_out" and applies it to the specified
396 396 repository.
397 397
398 398 Like "stream_out," the status line added by the wire protocol is not
399 399 handled by this function.
400 400 """
401 401 with repo.lock():
402 402 repo.ui.status(
403 403 _(b'%d files to transfer, %s of data\n')
404 404 % (filecount, util.bytecount(bytecount))
405 405 )
406 406 progress = repo.ui.makeprogress(
407 407 _(b'clone'), total=bytecount, unit=_(b'bytes')
408 408 )
409 409 progress.update(0)
410 410 start = util.timer()
411 411
412 412 # TODO: get rid of (potential) inconsistency
413 413 #
414 414 # If transaction is started and any @filecache property is
415 415 # changed at this point, it causes inconsistency between
416 416 # in-memory cached property and streamclone-ed file on the
417 417 # disk. Nested transaction prevents transaction scope "clone"
418 418 # below from writing in-memory changes out at the end of it,
419 419 # even though in-memory changes are discarded at the end of it
420 420 # regardless of transaction nesting.
421 421 #
422 422 # But transaction nesting can't be simply prohibited, because
423 423 # nesting occurs also in ordinary case (e.g. enabling
424 424 # clonebundles).
425 425
426 426 with repo.transaction(b'clone'):
427 427 with repo.svfs.backgroundclosing(repo.ui, expectedcount=filecount):
428 428 for i in range(filecount):
429 429 # XXX doesn't support '\n' or '\r' in filenames
430 430 l = fp.readline()
431 431 try:
432 432 name, size = l.split(b'\0', 1)
433 433 size = int(size)
434 434 except (ValueError, TypeError):
435 435 raise error.ResponseError(
436 436 _(b'unexpected response from remote server:'), l
437 437 )
438 438 if repo.ui.debugflag:
439 439 repo.ui.debug(
440 440 b'adding %s (%s)\n' % (name, util.bytecount(size))
441 441 )
442 442 # for backwards compat, name was partially encoded
443 443 path = store.decodedir(name)
444 444 with repo.svfs(path, b'w', backgroundclose=True) as ofp:
445 445 for chunk in util.filechunkiter(fp, limit=size):
446 446 progress.increment(step=len(chunk))
447 447 ofp.write(chunk)
448 448
449 449 # force @filecache properties to be reloaded from
450 450 # streamclone-ed file at next access
451 451 repo.invalidate(clearfilecache=True)
452 452
453 453 elapsed = util.timer() - start
454 454 if elapsed <= 0:
455 455 elapsed = 0.001
456 456 progress.complete()
457 457 repo.ui.status(
458 458 _(b'transferred %s in %.1f seconds (%s/sec)\n')
459 459 % (
460 460 util.bytecount(bytecount),
461 461 elapsed,
462 462 util.bytecount(bytecount / elapsed),
463 463 )
464 464 )
465 465
466 466
467 467 def readbundle1header(fp):
468 468 compression = fp.read(2)
469 469 if compression != b'UN':
470 470 raise error.Abort(
471 471 _(
472 472 b'only uncompressed stream clone bundles are '
473 473 b'supported; got %s'
474 474 )
475 475 % compression
476 476 )
477 477
478 478 filecount, bytecount = struct.unpack(b'>QQ', fp.read(16))
479 479 requireslen = struct.unpack(b'>H', fp.read(2))[0]
480 480 requires = fp.read(requireslen)
481 481
482 482 if not requires.endswith(b'\0'):
483 483 raise error.Abort(
484 484 _(
485 485 b'malformed stream clone bundle: '
486 486 b'requirements not properly encoded'
487 487 )
488 488 )
489 489
490 490 requirements = set(requires.rstrip(b'\0').split(b','))
491 491
492 492 return filecount, bytecount, requirements
493 493
494 494
495 495 def applybundlev1(repo, fp):
496 496 """Apply the content from a stream clone bundle version 1.
497 497
498 498 We assume the 4 byte header has been read and validated and the file handle
499 499 is at the 2 byte compression identifier.
500 500 """
501 501 if len(repo):
502 502 raise error.Abort(
503 503 _(b'cannot apply stream clone bundle on non-empty repo')
504 504 )
505 505
506 506 filecount, bytecount, requirements = readbundle1header(fp)
507 507 missingreqs = requirements - repo.supported
508 508 if missingreqs:
509 509 raise error.Abort(
510 510 _(b'unable to apply stream clone: unsupported format: %s')
511 511 % b', '.join(sorted(missingreqs))
512 512 )
513 513
514 514 consumev1(repo, fp, filecount, bytecount)
515 515 nodemap.post_stream_cleanup(repo)
516 516
517 517
518 518 class streamcloneapplier:
519 519 """Class to manage applying streaming clone bundles.
520 520
521 521 We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
522 522 readers to perform bundle type-specific functionality.
523 523 """
524 524
525 525 def __init__(self, fh):
526 526 self._fh = fh
527 527
528 528 def apply(self, repo):
529 529 return applybundlev1(repo, self._fh)
530 530
531 531
532 532 # type of file to stream
533 533 _fileappend = 0 # append only file
534 534 _filefull = 1 # full snapshot file
535 535
536 536 # Source of the file
537 537 _srcstore = b's' # store (svfs)
538 538 _srccache = b'c' # cache (cache)
539 539
540 540 # This is it's own function so extensions can override it.
541 541 def _walkstreamfullstorefiles(repo):
542 542 """list snapshot file from the store"""
543 543 fnames = []
544 544 if not repo.publishing():
545 545 fnames.append(b'phaseroots')
546 546 return fnames
547 547
548 548
549 549 def _filterfull(entry, copy, vfsmap):
550 550 """actually copy the snapshot files"""
551 551 src, name, ftype, data = entry
552 552 if ftype != _filefull:
553 553 return entry
554 554 return (src, name, ftype, copy(vfsmap[src].join(name)))
555 555
556 556
557 557 @contextlib.contextmanager
558 558 def maketempcopies():
559 559 """return a function to temporary copy file"""
560 560
561 561 files = []
562 562 dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-')
563 563 try:
564 564
565 565 def copy(src):
566 566 fd, dst = pycompat.mkstemp(
567 567 prefix=os.path.basename(src), dir=dst_dir
568 568 )
569 569 os.close(fd)
570 570 files.append(dst)
571 571 util.copyfiles(src, dst, hardlink=True)
572 572 return dst
573 573
574 574 yield copy
575 575 finally:
576 576 for tmp in files:
577 577 util.tryunlink(tmp)
578 578 util.tryrmdir(dst_dir)
579 579
580 580
581 581 def _makemap(repo):
582 582 """make a (src -> vfs) map for the repo"""
583 583 vfsmap = {
584 584 _srcstore: repo.svfs,
585 585 _srccache: repo.cachevfs,
586 586 }
587 587 # we keep repo.vfs out of the on purpose, ther are too many danger there
588 588 # (eg: .hg/hgrc)
589 589 assert repo.vfs not in vfsmap.values()
590 590
591 591 return vfsmap
592 592
593 593
594 594 def _emit2(repo, entries, totalfilesize):
595 595 """actually emit the stream bundle"""
596 596 vfsmap = _makemap(repo)
597 597 # we keep repo.vfs out of the on purpose, ther are too many danger there
598 598 # (eg: .hg/hgrc),
599 599 #
600 600 # this assert is duplicated (from _makemap) as author might think this is
601 601 # fine, while this is really not fine.
602 602 if repo.vfs in vfsmap.values():
603 603 raise error.ProgrammingError(
604 604 b'repo.vfs must not be added to vfsmap for security reasons'
605 605 )
606 606
607 607 progress = repo.ui.makeprogress(
608 608 _(b'bundle'), total=totalfilesize, unit=_(b'bytes')
609 609 )
610 610 progress.update(0)
611 611 with maketempcopies() as copy, progress:
612 612 # copy is delayed until we are in the try
613 613 entries = [_filterfull(e, copy, vfsmap) for e in entries]
614 614 yield None # this release the lock on the repository
615 615 totalbytecount = 0
616 616
617 617 for src, name, ftype, data in entries:
618 618 vfs = vfsmap[src]
619 619 yield src
620 620 yield util.uvarintencode(len(name))
621 621 if ftype == _fileappend:
622 622 fp = vfs(name)
623 623 size = data
624 624 elif ftype == _filefull:
625 625 fp = open(data, b'rb')
626 626 size = util.fstat(fp).st_size
627 627 bytecount = 0
628 628 try:
629 629 yield util.uvarintencode(size)
630 630 yield name
631 631 if size <= 65536:
632 632 chunks = (fp.read(size),)
633 633 else:
634 634 chunks = util.filechunkiter(fp, limit=size)
635 635 for chunk in chunks:
636 636 bytecount += len(chunk)
637 637 totalbytecount += len(chunk)
638 638 progress.update(totalbytecount)
639 639 yield chunk
640 640 if bytecount != size:
641 641 # Would most likely be caused by a race due to `hg strip` or
642 642 # a revlog split
643 643 raise error.Abort(
644 644 _(
645 645 b'clone could only read %d bytes from %s, but '
646 646 b'expected %d bytes'
647 647 )
648 648 % (bytecount, name, size)
649 649 )
650 650 finally:
651 651 fp.close()
652 652
653 653
654 654 def _test_sync_point_walk_1(repo):
655 655 """a function for synchronisation during tests"""
656 656
657 657
658 658 def _test_sync_point_walk_2(repo):
659 659 """a function for synchronisation during tests"""
660 660
661 661
662 662 def _v2_walk(repo, includes, excludes, includeobsmarkers):
663 663 """emit a seris of files information useful to clone a repo
664 664
665 665 return (entries, totalfilesize)
666 666
667 667 entries is a list of tuple (vfs-key, file-path, file-type, size)
668 668
669 669 - `vfs-key`: is a key to the right vfs to write the file (see _makemap)
670 670 - `name`: file path of the file to copy (to be feed to the vfss)
671 671 - `file-type`: do this file need to be copied with the source lock ?
672 672 - `size`: the size of the file (or None)
673 673 """
674 674 assert repo._currentlock(repo._lockref) is not None
675 entries = []
675 files = []
676 676 totalfilesize = 0
677 677
678 678 matcher = None
679 679 if includes or excludes:
680 680 matcher = narrowspec.match(repo.root, includes, excludes)
681 681
682 682 phase = not repo.publishing()
683 for entry in _walkstreamfiles(repo, matcher, phase=phase):
683 entries = _walkstreamfiles(
684 repo, matcher, phase=phase, obsolescence=includeobsmarkers
685 )
686 for entry in entries:
684 687 for f in entry.files():
685 688 file_size = f.file_size(repo.store.vfs)
686 689 if file_size:
687 690 ft = _fileappend
688 691 if f.is_volatile:
689 692 ft = _filefull
690 entries.append((_srcstore, f.unencoded_path, ft, file_size))
693 files.append((_srcstore, f.unencoded_path, ft, file_size))
691 694 totalfilesize += file_size
692 if includeobsmarkers and repo.svfs.exists(b'obsstore'):
693 totalfilesize += repo.svfs.lstat(b'obsstore').st_size
694 entries.append((_srcstore, b'obsstore', _filefull, None))
695 695 for name in cacheutil.cachetocopy(repo):
696 696 if repo.cachevfs.exists(name):
697 697 totalfilesize += repo.cachevfs.lstat(name).st_size
698 entries.append((_srccache, name, _filefull, None))
699 return entries, totalfilesize
698 files.append((_srccache, name, _filefull, None))
699 return files, totalfilesize
700 700
701 701
702 702 def generatev2(repo, includes, excludes, includeobsmarkers):
703 703 """Emit content for version 2 of a streaming clone.
704 704
705 705 the data stream consists the following entries:
706 706 1) A char representing the file destination (eg: store or cache)
707 707 2) A varint containing the length of the filename
708 708 3) A varint containing the length of file data
709 709 4) N bytes containing the filename (the internal, store-agnostic form)
710 710 5) N bytes containing the file data
711 711
712 712 Returns a 3-tuple of (file count, file size, data iterator).
713 713 """
714 714
715 715 with repo.lock():
716 716
717 717 repo.ui.debug(b'scanning\n')
718 718
719 719 entries, totalfilesize = _v2_walk(
720 720 repo,
721 721 includes=includes,
722 722 excludes=excludes,
723 723 includeobsmarkers=includeobsmarkers,
724 724 )
725 725
726 726 chunks = _emit2(repo, entries, totalfilesize)
727 727 first = next(chunks)
728 728 assert first is None
729 729 _test_sync_point_walk_1(repo)
730 730 _test_sync_point_walk_2(repo)
731 731
732 732 return len(entries), totalfilesize, chunks
733 733
734 734
735 735 @contextlib.contextmanager
736 736 def nested(*ctxs):
737 737 this = ctxs[0]
738 738 rest = ctxs[1:]
739 739 with this:
740 740 if rest:
741 741 with nested(*rest):
742 742 yield
743 743 else:
744 744 yield
745 745
746 746
747 747 def consumev2(repo, fp, filecount, filesize):
748 748 """Apply the contents from a version 2 streaming clone.
749 749
750 750 Data is read from an object that only needs to provide a ``read(size)``
751 751 method.
752 752 """
753 753 with repo.lock():
754 754 repo.ui.status(
755 755 _(b'%d files to transfer, %s of data\n')
756 756 % (filecount, util.bytecount(filesize))
757 757 )
758 758
759 759 start = util.timer()
760 760 progress = repo.ui.makeprogress(
761 761 _(b'clone'), total=filesize, unit=_(b'bytes')
762 762 )
763 763 progress.update(0)
764 764
765 765 vfsmap = _makemap(repo)
766 766 # we keep repo.vfs out of the on purpose, ther are too many danger
767 767 # there (eg: .hg/hgrc),
768 768 #
769 769 # this assert is duplicated (from _makemap) as author might think this
770 770 # is fine, while this is really not fine.
771 771 if repo.vfs in vfsmap.values():
772 772 raise error.ProgrammingError(
773 773 b'repo.vfs must not be added to vfsmap for security reasons'
774 774 )
775 775
776 776 with repo.transaction(b'clone'):
777 777 ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values())
778 778 with nested(*ctxs):
779 779 for i in range(filecount):
780 780 src = util.readexactly(fp, 1)
781 781 vfs = vfsmap[src]
782 782 namelen = util.uvarintdecodestream(fp)
783 783 datalen = util.uvarintdecodestream(fp)
784 784
785 785 name = util.readexactly(fp, namelen)
786 786
787 787 if repo.ui.debugflag:
788 788 repo.ui.debug(
789 789 b'adding [%s] %s (%s)\n'
790 790 % (src, name, util.bytecount(datalen))
791 791 )
792 792
793 793 with vfs(name, b'w') as ofp:
794 794 for chunk in util.filechunkiter(fp, limit=datalen):
795 795 progress.increment(step=len(chunk))
796 796 ofp.write(chunk)
797 797
798 798 # force @filecache properties to be reloaded from
799 799 # streamclone-ed file at next access
800 800 repo.invalidate(clearfilecache=True)
801 801
802 802 elapsed = util.timer() - start
803 803 if elapsed <= 0:
804 804 elapsed = 0.001
805 805 repo.ui.status(
806 806 _(b'transferred %s in %.1f seconds (%s/sec)\n')
807 807 % (
808 808 util.bytecount(progress.pos),
809 809 elapsed,
810 810 util.bytecount(progress.pos / elapsed),
811 811 )
812 812 )
813 813 progress.complete()
814 814
815 815
816 816 def applybundlev2(repo, fp, filecount, filesize, requirements):
817 817 from . import localrepo
818 818
819 819 missingreqs = [r for r in requirements if r not in repo.supported]
820 820 if missingreqs:
821 821 raise error.Abort(
822 822 _(b'unable to apply stream clone: unsupported format: %s')
823 823 % b', '.join(sorted(missingreqs))
824 824 )
825 825
826 826 consumev2(repo, fp, filecount, filesize)
827 827
828 828 repo.requirements = new_stream_clone_requirements(
829 829 repo.requirements,
830 830 requirements,
831 831 )
832 832 repo.svfs.options = localrepo.resolvestorevfsoptions(
833 833 repo.ui, repo.requirements, repo.features
834 834 )
835 835 scmutil.writereporequirements(repo)
836 836 nodemap.post_stream_cleanup(repo)
837 837
838 838
839 839 def _copy_files(src_vfs_map, dst_vfs_map, entries, progress):
840 840 hardlink = [True]
841 841
842 842 def copy_used():
843 843 hardlink[0] = False
844 844 progress.topic = _(b'copying')
845 845
846 846 for k, path, size in entries:
847 847 src_vfs = src_vfs_map[k]
848 848 dst_vfs = dst_vfs_map[k]
849 849 src_path = src_vfs.join(path)
850 850 dst_path = dst_vfs.join(path)
851 851 # We cannot use dirname and makedirs of dst_vfs here because the store
852 852 # encoding confuses them. See issue 6581 for details.
853 853 dirname = os.path.dirname(dst_path)
854 854 if not os.path.exists(dirname):
855 855 util.makedirs(dirname)
856 856 dst_vfs.register_file(path)
857 857 # XXX we could use the #nb_bytes argument.
858 858 util.copyfile(
859 859 src_path,
860 860 dst_path,
861 861 hardlink=hardlink[0],
862 862 no_hardlink_cb=copy_used,
863 863 check_fs_hardlink=False,
864 864 )
865 865 progress.increment()
866 866 return hardlink[0]
867 867
868 868
869 869 def local_copy(src_repo, dest_repo):
870 870 """copy all content from one local repository to another
871 871
872 872 This is useful for local clone"""
873 873 src_store_requirements = {
874 874 r
875 875 for r in src_repo.requirements
876 876 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
877 877 }
878 878 dest_store_requirements = {
879 879 r
880 880 for r in dest_repo.requirements
881 881 if r not in requirementsmod.WORKING_DIR_REQUIREMENTS
882 882 }
883 883 assert src_store_requirements == dest_store_requirements
884 884
885 885 with dest_repo.lock():
886 886 with src_repo.lock():
887 887
888 888 # bookmark is not integrated to the streaming as it might use the
889 889 # `repo.vfs` and they are too many sentitive data accessible
890 890 # through `repo.vfs` to expose it to streaming clone.
891 891 src_book_vfs = bookmarks.bookmarksvfs(src_repo)
892 892 srcbookmarks = src_book_vfs.join(b'bookmarks')
893 893 bm_count = 0
894 894 if os.path.exists(srcbookmarks):
895 895 bm_count = 1
896 896
897 897 entries, totalfilesize = _v2_walk(
898 898 src_repo,
899 899 includes=None,
900 900 excludes=None,
901 901 includeobsmarkers=True,
902 902 )
903 903 src_vfs_map = _makemap(src_repo)
904 904 dest_vfs_map = _makemap(dest_repo)
905 905 progress = src_repo.ui.makeprogress(
906 906 topic=_(b'linking'),
907 907 total=len(entries) + bm_count,
908 908 unit=_(b'files'),
909 909 )
910 910 # copy files
911 911 #
912 912 # We could copy the full file while the source repository is locked
913 913 # and the other one without the lock. However, in the linking case,
914 914 # this would also requires checks that nobody is appending any data
915 915 # to the files while we do the clone, so this is not done yet. We
916 916 # could do this blindly when copying files.
917 917 files = ((k, path, size) for k, path, ftype, size in entries)
918 918 hardlink = _copy_files(src_vfs_map, dest_vfs_map, files, progress)
919 919
920 920 # copy bookmarks over
921 921 if bm_count:
922 922 dst_book_vfs = bookmarks.bookmarksvfs(dest_repo)
923 923 dstbookmarks = dst_book_vfs.join(b'bookmarks')
924 924 util.copyfile(srcbookmarks, dstbookmarks)
925 925 progress.complete()
926 926 if hardlink:
927 927 msg = b'linked %d files\n'
928 928 else:
929 929 msg = b'copied %d files\n'
930 930 src_repo.ui.debug(msg % (len(entries) + bm_count))
931 931
932 932 with dest_repo.transaction(b"localclone") as tr:
933 933 dest_repo.store.write(tr)
934 934
935 935 # clean up transaction file as they do not make sense
936 936 transaction.cleanup_undo_files(dest_repo.ui.warn, dest_repo.vfs_map)
General Comments 0
You need to be logged in to leave comments. Login now