##// END OF EJS Templates
py3: use node.hex(h.digest()) instead of h.hexdigest()...
Pulkit Goyal -
r40648:3fa4183e default
parent child Browse files
Show More
@@ -1,377 +1,378 b''
1 1 # debugcommands.py - debug logic for remotefilelog
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 from __future__ import absolute_import
8 8
9 9 import hashlib
10 10 import os
11 11 import zlib
12 12
13 13 from mercurial.node import bin, hex, nullid, short
14 14 from mercurial.i18n import _
15 15 from mercurial import (
16 16 error,
17 17 filelog,
18 node as nodemod,
18 19 revlog,
19 20 )
20 21 from . import (
21 22 constants,
22 23 datapack,
23 24 extutil,
24 25 fileserverclient,
25 26 historypack,
26 27 repack,
27 28 shallowutil,
28 29 )
29 30
30 31 def debugremotefilelog(ui, path, **opts):
31 32 decompress = opts.get(r'decompress')
32 33
33 34 size, firstnode, mapping = parsefileblob(path, decompress)
34 35
35 36 ui.status(_("size: %s bytes\n") % (size))
36 37 ui.status(_("path: %s \n") % (path))
37 38 ui.status(_("key: %s \n") % (short(firstnode)))
38 39 ui.status(_("\n"))
39 40 ui.status(_("%12s => %12s %13s %13s %12s\n") %
40 41 ("node", "p1", "p2", "linknode", "copyfrom"))
41 42
42 43 queue = [firstnode]
43 44 while queue:
44 45 node = queue.pop(0)
45 46 p1, p2, linknode, copyfrom = mapping[node]
46 47 ui.status(_("%s => %s %s %s %s\n") %
47 48 (short(node), short(p1), short(p2), short(linknode), copyfrom))
48 49 if p1 != nullid:
49 50 queue.append(p1)
50 51 if p2 != nullid:
51 52 queue.append(p2)
52 53
53 54 def buildtemprevlog(repo, file):
54 55 # get filename key
55 filekey = hashlib.sha1(file).hexdigest()
56 filekey = nodemod.hex(hashlib.sha1(file).digest())
56 57 filedir = os.path.join(repo.path, 'store/data', filekey)
57 58
58 59 # sort all entries based on linkrev
59 60 fctxs = []
60 61 for filenode in os.listdir(filedir):
61 62 if '_old' not in filenode:
62 63 fctxs.append(repo.filectx(file, fileid=bin(filenode)))
63 64
64 65 fctxs = sorted(fctxs, key=lambda x: x.linkrev())
65 66
66 67 # add to revlog
67 68 temppath = repo.sjoin('data/temprevlog.i')
68 69 if os.path.exists(temppath):
69 70 os.remove(temppath)
70 71 r = filelog.filelog(repo.svfs, 'temprevlog')
71 72
72 73 class faket(object):
73 74 def add(self, a, b, c):
74 75 pass
75 76 t = faket()
76 77 for fctx in fctxs:
77 78 if fctx.node() not in repo:
78 79 continue
79 80
80 81 p = fctx.filelog().parents(fctx.filenode())
81 82 meta = {}
82 83 if fctx.renamed():
83 84 meta['copy'] = fctx.renamed()[0]
84 85 meta['copyrev'] = hex(fctx.renamed()[1])
85 86
86 87 r.add(fctx.data(), meta, t, fctx.linkrev(), p[0], p[1])
87 88
88 89 return r
89 90
90 91 def debugindex(orig, ui, repo, file_=None, **opts):
91 92 """dump the contents of an index file"""
92 93 if (opts.get(r'changelog') or
93 94 opts.get(r'manifest') or
94 95 opts.get(r'dir') or
95 96 not shallowutil.isenabled(repo) or
96 97 not repo.shallowmatch(file_)):
97 98 return orig(ui, repo, file_, **opts)
98 99
99 100 r = buildtemprevlog(repo, file_)
100 101
101 102 # debugindex like normal
102 103 format = opts.get('format', 0)
103 104 if format not in (0, 1):
104 105 raise error.Abort(_("unknown format %d") % format)
105 106
106 107 generaldelta = r.version & revlog.FLAG_GENERALDELTA
107 108 if generaldelta:
108 109 basehdr = ' delta'
109 110 else:
110 111 basehdr = ' base'
111 112
112 113 if format == 0:
113 114 ui.write((" rev offset length " + basehdr + " linkrev"
114 115 " nodeid p1 p2\n"))
115 116 elif format == 1:
116 117 ui.write((" rev flag offset length"
117 118 " size " + basehdr + " link p1 p2"
118 119 " nodeid\n"))
119 120
120 121 for i in r:
121 122 node = r.node(i)
122 123 if generaldelta:
123 124 base = r.deltaparent(i)
124 125 else:
125 126 base = r.chainbase(i)
126 127 if format == 0:
127 128 try:
128 129 pp = r.parents(node)
129 130 except Exception:
130 131 pp = [nullid, nullid]
131 132 ui.write("% 6d % 9d % 7d % 6d % 7d %s %s %s\n" % (
132 133 i, r.start(i), r.length(i), base, r.linkrev(i),
133 134 short(node), short(pp[0]), short(pp[1])))
134 135 elif format == 1:
135 136 pr = r.parentrevs(i)
136 137 ui.write("% 6d %04x % 8d % 8d % 8d % 6d % 6d % 6d % 6d %s\n" % (
137 138 i, r.flags(i), r.start(i), r.length(i), r.rawsize(i),
138 139 base, r.linkrev(i), pr[0], pr[1], short(node)))
139 140
140 141 def debugindexdot(orig, ui, repo, file_):
141 142 """dump an index DAG as a graphviz dot file"""
142 143 if not shallowutil.isenabled(repo):
143 144 return orig(ui, repo, file_)
144 145
145 146 r = buildtemprevlog(repo, os.path.basename(file_)[:-2])
146 147
147 148 ui.write(("digraph G {\n"))
148 149 for i in r:
149 150 node = r.node(i)
150 151 pp = r.parents(node)
151 152 ui.write("\t%d -> %d\n" % (r.rev(pp[0]), i))
152 153 if pp[1] != nullid:
153 154 ui.write("\t%d -> %d\n" % (r.rev(pp[1]), i))
154 155 ui.write("}\n")
155 156
156 157 def verifyremotefilelog(ui, path, **opts):
157 158 decompress = opts.get(r'decompress')
158 159
159 160 for root, dirs, files in os.walk(path):
160 161 for file in files:
161 162 if file == "repos":
162 163 continue
163 164 filepath = os.path.join(root, file)
164 165 size, firstnode, mapping = parsefileblob(filepath, decompress)
165 166 for p1, p2, linknode, copyfrom in mapping.itervalues():
166 167 if linknode == nullid:
167 168 actualpath = os.path.relpath(root, path)
168 169 key = fileserverclient.getcachekey("reponame", actualpath,
169 170 file)
170 171 ui.status("%s %s\n" % (key, os.path.relpath(filepath,
171 172 path)))
172 173
173 174 def _decompressblob(raw):
174 175 return zlib.decompress(raw)
175 176
176 177 def parsefileblob(path, decompress):
177 178 raw = None
178 179 f = open(path, "r")
179 180 try:
180 181 raw = f.read()
181 182 finally:
182 183 f.close()
183 184
184 185 if decompress:
185 186 raw = _decompressblob(raw)
186 187
187 188 offset, size, flags = shallowutil.parsesizeflags(raw)
188 189 start = offset + size
189 190
190 191 firstnode = None
191 192
192 193 mapping = {}
193 194 while start < len(raw):
194 195 divider = raw.index('\0', start + 80)
195 196
196 197 currentnode = raw[start:(start + 20)]
197 198 if not firstnode:
198 199 firstnode = currentnode
199 200
200 201 p1 = raw[(start + 20):(start + 40)]
201 202 p2 = raw[(start + 40):(start + 60)]
202 203 linknode = raw[(start + 60):(start + 80)]
203 204 copyfrom = raw[(start + 80):divider]
204 205
205 206 mapping[currentnode] = (p1, p2, linknode, copyfrom)
206 207 start = divider + 1
207 208
208 209 return size, firstnode, mapping
209 210
210 211 def debugdatapack(ui, *paths, **opts):
211 212 for path in paths:
212 213 if '.data' in path:
213 214 path = path[:path.index('.data')]
214 215 ui.write("%s:\n" % path)
215 216 dpack = datapack.datapack(path)
216 217 node = opts.get(r'node')
217 218 if node:
218 219 deltachain = dpack.getdeltachain('', bin(node))
219 220 dumpdeltachain(ui, deltachain, **opts)
220 221 return
221 222
222 223 if opts.get(r'long'):
223 224 hashformatter = hex
224 225 hashlen = 42
225 226 else:
226 227 hashformatter = short
227 228 hashlen = 14
228 229
229 230 lastfilename = None
230 231 totaldeltasize = 0
231 232 totalblobsize = 0
232 233 def printtotals():
233 234 if lastfilename is not None:
234 235 ui.write("\n")
235 236 if not totaldeltasize or not totalblobsize:
236 237 return
237 238 difference = totalblobsize - totaldeltasize
238 239 deltastr = "%0.1f%% %s" % (
239 240 (100.0 * abs(difference) / totalblobsize),
240 241 ("smaller" if difference > 0 else "bigger"))
241 242
242 243 ui.write(("Total:%s%s %s (%s)\n") % (
243 244 "".ljust(2 * hashlen - len("Total:")),
244 245 str(totaldeltasize).ljust(12),
245 246 str(totalblobsize).ljust(9),
246 247 deltastr
247 248 ))
248 249
249 250 bases = {}
250 251 nodes = set()
251 252 failures = 0
252 253 for filename, node, deltabase, deltalen in dpack.iterentries():
253 254 bases[node] = deltabase
254 255 if node in nodes:
255 256 ui.write(("Bad entry: %s appears twice\n" % short(node)))
256 257 failures += 1
257 258 nodes.add(node)
258 259 if filename != lastfilename:
259 260 printtotals()
260 261 name = '(empty name)' if filename == '' else filename
261 262 ui.write("%s:\n" % name)
262 263 ui.write("%s%s%s%s\n" % (
263 264 "Node".ljust(hashlen),
264 265 "Delta Base".ljust(hashlen),
265 266 "Delta Length".ljust(14),
266 267 "Blob Size".ljust(9)))
267 268 lastfilename = filename
268 269 totalblobsize = 0
269 270 totaldeltasize = 0
270 271
271 272 # Metadata could be missing, in which case it will be an empty dict.
272 273 meta = dpack.getmeta(filename, node)
273 274 if constants.METAKEYSIZE in meta:
274 275 blobsize = meta[constants.METAKEYSIZE]
275 276 totaldeltasize += deltalen
276 277 totalblobsize += blobsize
277 278 else:
278 279 blobsize = "(missing)"
279 280 ui.write("%s %s %s%s\n" % (
280 281 hashformatter(node),
281 282 hashformatter(deltabase),
282 283 str(deltalen).ljust(14),
283 284 blobsize))
284 285
285 286 if filename is not None:
286 287 printtotals()
287 288
288 289 failures += _sanitycheck(ui, set(nodes), bases)
289 290 if failures > 1:
290 291 ui.warn(("%d failures\n" % failures))
291 292 return 1
292 293
293 294 def _sanitycheck(ui, nodes, bases):
294 295 """
295 296 Does some basic sanity checking on a packfiles with ``nodes`` ``bases`` (a
296 297 mapping of node->base):
297 298
298 299 - Each deltabase must itself be a node elsewhere in the pack
299 300 - There must be no cycles
300 301 """
301 302 failures = 0
302 303 for node in nodes:
303 304 seen = set()
304 305 current = node
305 306 deltabase = bases[current]
306 307
307 308 while deltabase != nullid:
308 309 if deltabase not in nodes:
309 310 ui.warn(("Bad entry: %s has an unknown deltabase (%s)\n" %
310 311 (short(node), short(deltabase))))
311 312 failures += 1
312 313 break
313 314
314 315 if deltabase in seen:
315 316 ui.warn(("Bad entry: %s has a cycle (at %s)\n" %
316 317 (short(node), short(deltabase))))
317 318 failures += 1
318 319 break
319 320
320 321 current = deltabase
321 322 seen.add(current)
322 323 deltabase = bases[current]
323 324 # Since ``node`` begins a valid chain, reset/memoize its base to nullid
324 325 # so we don't traverse it again.
325 326 bases[node] = nullid
326 327 return failures
327 328
328 329 def dumpdeltachain(ui, deltachain, **opts):
329 330 hashformatter = hex
330 331 hashlen = 40
331 332
332 333 lastfilename = None
333 334 for filename, node, filename, deltabasenode, delta in deltachain:
334 335 if filename != lastfilename:
335 336 ui.write("\n%s\n" % filename)
336 337 lastfilename = filename
337 338 ui.write("%s %s %s %s\n" % (
338 339 "Node".ljust(hashlen),
339 340 "Delta Base".ljust(hashlen),
340 341 "Delta SHA1".ljust(hashlen),
341 342 "Delta Length".ljust(6),
342 343 ))
343 344
344 345 ui.write("%s %s %s %s\n" % (
345 346 hashformatter(node),
346 347 hashformatter(deltabasenode),
347 hashlib.sha1(delta).hexdigest(),
348 nodemod.hex(hashlib.sha1(delta).digest()),
348 349 len(delta)))
349 350
350 351 def debughistorypack(ui, path):
351 352 if '.hist' in path:
352 353 path = path[:path.index('.hist')]
353 354 hpack = historypack.historypack(path)
354 355
355 356 lastfilename = None
356 357 for entry in hpack.iterentries():
357 358 filename, node, p1node, p2node, linknode, copyfrom = entry
358 359 if filename != lastfilename:
359 360 ui.write("\n%s\n" % filename)
360 361 ui.write("%s%s%s%s%s\n" % (
361 362 "Node".ljust(14),
362 363 "P1 Node".ljust(14),
363 364 "P2 Node".ljust(14),
364 365 "Link Node".ljust(14),
365 366 "Copy From"))
366 367 lastfilename = filename
367 368 ui.write("%s %s %s %s %s\n" % (short(node), short(p1node),
368 369 short(p2node), short(linknode), copyfrom))
369 370
370 371 def debugwaitonrepack(repo):
371 372 with extutil.flock(repack.repacklockvfs(repo).join('repacklock'), ''):
372 373 return
373 374
374 375 def debugwaitonprefetch(repo):
375 376 with repo._lock(repo.svfs, "prefetchlock", True, None,
376 377 None, _('prefetching in %s') % repo.origroot):
377 378 pass
@@ -1,588 +1,589 b''
1 1 # fileserverclient.py - client for communicating with the cache process
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import io
12 12 import os
13 13 import threading
14 14 import time
15 15 import zlib
16 16
17 17 from mercurial.i18n import _
18 18 from mercurial.node import bin, hex, nullid
19 19 from mercurial import (
20 20 error,
21 node,
21 22 pycompat,
22 23 revlog,
23 24 sshpeer,
24 25 util,
25 26 wireprotov1peer,
26 27 )
27 28 from mercurial.utils import procutil
28 29
29 30 from . import (
30 31 constants,
31 32 contentstore,
32 33 metadatastore,
33 34 )
34 35
35 36 _sshv1peer = sshpeer.sshv1peer
36 37
37 38 # Statistics for debugging
38 39 fetchcost = 0
39 40 fetches = 0
40 41 fetched = 0
41 42 fetchmisses = 0
42 43
43 44 _lfsmod = None
44 45 _downloading = _('downloading')
45 46
46 47 def getcachekey(reponame, file, id):
47 pathhash = hashlib.sha1(file).hexdigest()
48 pathhash = node.hex(hashlib.sha1(file).digest())
48 49 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
49 50
50 51 def getlocalkey(file, id):
51 pathhash = hashlib.sha1(file).hexdigest()
52 pathhash = node.hex(hashlib.sha1(file).digest())
52 53 return os.path.join(pathhash, id)
53 54
54 55 def peersetup(ui, peer):
55 56
56 57 class remotefilepeer(peer.__class__):
57 58 @wireprotov1peer.batchable
58 59 def x_rfl_getfile(self, file, node):
59 60 if not self.capable('x_rfl_getfile'):
60 61 raise error.Abort(
61 62 'configured remotefile server does not support getfile')
62 63 f = wireprotov1peer.future()
63 64 yield {'file': file, 'node': node}, f
64 65 code, data = f.value.split('\0', 1)
65 66 if int(code):
66 67 raise error.LookupError(file, node, data)
67 68 yield data
68 69
69 70 @wireprotov1peer.batchable
70 71 def x_rfl_getflogheads(self, path):
71 72 if not self.capable('x_rfl_getflogheads'):
72 73 raise error.Abort('configured remotefile server does not '
73 74 'support getflogheads')
74 75 f = wireprotov1peer.future()
75 76 yield {'path': path}, f
76 77 heads = f.value.split('\n') if f.value else []
77 78 yield heads
78 79
79 80 def _updatecallstreamopts(self, command, opts):
80 81 if command != 'getbundle':
81 82 return
82 83 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
83 84 not in self.capabilities()):
84 85 return
85 86 if not util.safehasattr(self, '_localrepo'):
86 87 return
87 88 if (constants.SHALLOWREPO_REQUIREMENT
88 89 not in self._localrepo.requirements):
89 90 return
90 91
91 92 bundlecaps = opts.get('bundlecaps')
92 93 if bundlecaps:
93 94 bundlecaps = [bundlecaps]
94 95 else:
95 96 bundlecaps = []
96 97
97 98 # shallow, includepattern, and excludepattern are a hacky way of
98 99 # carrying over data from the local repo to this getbundle
99 100 # command. We need to do it this way because bundle1 getbundle
100 101 # doesn't provide any other place we can hook in to manipulate
101 102 # getbundle args before it goes across the wire. Once we get rid
102 103 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
103 104 # do this more cleanly.
104 105 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
105 106 if self._localrepo.includepattern:
106 107 patterns = '\0'.join(self._localrepo.includepattern)
107 108 includecap = "includepattern=" + patterns
108 109 bundlecaps.append(includecap)
109 110 if self._localrepo.excludepattern:
110 111 patterns = '\0'.join(self._localrepo.excludepattern)
111 112 excludecap = "excludepattern=" + patterns
112 113 bundlecaps.append(excludecap)
113 114 opts['bundlecaps'] = ','.join(bundlecaps)
114 115
115 116 def _sendrequest(self, command, args, **opts):
116 117 self._updatecallstreamopts(command, args)
117 118 return super(remotefilepeer, self)._sendrequest(command, args,
118 119 **opts)
119 120
120 121 def _callstream(self, command, **opts):
121 122 supertype = super(remotefilepeer, self)
122 123 if not util.safehasattr(supertype, '_sendrequest'):
123 124 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
124 125 return super(remotefilepeer, self)._callstream(command, **opts)
125 126
126 127 peer.__class__ = remotefilepeer
127 128
128 129 class cacheconnection(object):
129 130 """The connection for communicating with the remote cache. Performs
130 131 gets and sets by communicating with an external process that has the
131 132 cache-specific implementation.
132 133 """
133 134 def __init__(self):
134 135 self.pipeo = self.pipei = self.pipee = None
135 136 self.subprocess = None
136 137 self.connected = False
137 138
138 139 def connect(self, cachecommand):
139 140 if self.pipeo:
140 141 raise error.Abort(_("cache connection already open"))
141 142 self.pipei, self.pipeo, self.pipee, self.subprocess = \
142 143 procutil.popen4(cachecommand)
143 144 self.connected = True
144 145
145 146 def close(self):
146 147 def tryclose(pipe):
147 148 try:
148 149 pipe.close()
149 150 except Exception:
150 151 pass
151 152 if self.connected:
152 153 try:
153 154 self.pipei.write("exit\n")
154 155 except Exception:
155 156 pass
156 157 tryclose(self.pipei)
157 158 self.pipei = None
158 159 tryclose(self.pipeo)
159 160 self.pipeo = None
160 161 tryclose(self.pipee)
161 162 self.pipee = None
162 163 try:
163 164 # Wait for process to terminate, making sure to avoid deadlock.
164 165 # See https://docs.python.org/2/library/subprocess.html for
165 166 # warnings about wait() and deadlocking.
166 167 self.subprocess.communicate()
167 168 except Exception:
168 169 pass
169 170 self.subprocess = None
170 171 self.connected = False
171 172
172 173 def request(self, request, flush=True):
173 174 if self.connected:
174 175 try:
175 176 self.pipei.write(request)
176 177 if flush:
177 178 self.pipei.flush()
178 179 except IOError:
179 180 self.close()
180 181
181 182 def receiveline(self):
182 183 if not self.connected:
183 184 return None
184 185 try:
185 186 result = self.pipeo.readline()[:-1]
186 187 if not result:
187 188 self.close()
188 189 except IOError:
189 190 self.close()
190 191
191 192 return result
192 193
193 194 def _getfilesbatch(
194 195 remote, receivemissing, progresstick, missed, idmap, batchsize):
195 196 # Over http(s), iterbatch is a streamy method and we can start
196 197 # looking at results early. This means we send one (potentially
197 198 # large) request, but then we show nice progress as we process
198 199 # file results, rather than showing chunks of $batchsize in
199 200 # progress.
200 201 #
201 202 # Over ssh, iterbatch isn't streamy because batch() wasn't
202 203 # explicitly designed as a streaming method. In the future we
203 204 # should probably introduce a streambatch() method upstream and
204 205 # use that for this.
205 206 with remote.commandexecutor() as e:
206 207 futures = []
207 208 for m in missed:
208 209 futures.append(e.callcommand('x_rfl_getfile', {
209 210 'file': idmap[m],
210 211 'node': m[-40:]
211 212 }))
212 213
213 214 for i, m in enumerate(missed):
214 215 r = futures[i].result()
215 216 futures[i] = None # release memory
216 217 file_ = idmap[m]
217 218 node = m[-40:]
218 219 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
219 220 progresstick()
220 221
221 222 def _getfiles_optimistic(
222 223 remote, receivemissing, progresstick, missed, idmap, step):
223 224 remote._callstream("x_rfl_getfiles")
224 225 i = 0
225 226 pipeo = remote._pipeo
226 227 pipei = remote._pipei
227 228 while i < len(missed):
228 229 # issue a batch of requests
229 230 start = i
230 231 end = min(len(missed), start + step)
231 232 i = end
232 233 for missingid in missed[start:end]:
233 234 # issue new request
234 235 versionid = missingid[-40:]
235 236 file = idmap[missingid]
236 237 sshrequest = "%s%s\n" % (versionid, file)
237 238 pipeo.write(sshrequest)
238 239 pipeo.flush()
239 240
240 241 # receive batch results
241 242 for missingid in missed[start:end]:
242 243 versionid = missingid[-40:]
243 244 file = idmap[missingid]
244 245 receivemissing(pipei, file, versionid)
245 246 progresstick()
246 247
247 248 # End the command
248 249 pipeo.write('\n')
249 250 pipeo.flush()
250 251
251 252 def _getfiles_threaded(
252 253 remote, receivemissing, progresstick, missed, idmap, step):
253 254 remote._callstream("getfiles")
254 255 pipeo = remote._pipeo
255 256 pipei = remote._pipei
256 257
257 258 def writer():
258 259 for missingid in missed:
259 260 versionid = missingid[-40:]
260 261 file = idmap[missingid]
261 262 sshrequest = "%s%s\n" % (versionid, file)
262 263 pipeo.write(sshrequest)
263 264 pipeo.flush()
264 265 writerthread = threading.Thread(target=writer)
265 266 writerthread.daemon = True
266 267 writerthread.start()
267 268
268 269 for missingid in missed:
269 270 versionid = missingid[-40:]
270 271 file = idmap[missingid]
271 272 receivemissing(pipei, file, versionid)
272 273 progresstick()
273 274
274 275 writerthread.join()
275 276 # End the command
276 277 pipeo.write('\n')
277 278 pipeo.flush()
278 279
279 280 class fileserverclient(object):
280 281 """A client for requesting files from the remote file server.
281 282 """
282 283 def __init__(self, repo):
283 284 ui = repo.ui
284 285 self.repo = repo
285 286 self.ui = ui
286 287 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
287 288 if self.cacheprocess:
288 289 self.cacheprocess = util.expandpath(self.cacheprocess)
289 290
290 291 # This option causes remotefilelog to pass the full file path to the
291 292 # cacheprocess instead of a hashed key.
292 293 self.cacheprocesspasspath = ui.configbool(
293 294 "remotefilelog", "cacheprocess.includepath")
294 295
295 296 self.debugoutput = ui.configbool("remotefilelog", "debug")
296 297
297 298 self.remotecache = cacheconnection()
298 299
299 300 def setstore(self, datastore, historystore, writedata, writehistory):
300 301 self.datastore = datastore
301 302 self.historystore = historystore
302 303 self.writedata = writedata
303 304 self.writehistory = writehistory
304 305
305 306 def _connect(self):
306 307 return self.repo.connectionpool.get(self.repo.fallbackpath)
307 308
308 309 def request(self, fileids):
309 310 """Takes a list of filename/node pairs and fetches them from the
310 311 server. Files are stored in the local cache.
311 312 A list of nodes that the server couldn't find is returned.
312 313 If the connection fails, an exception is raised.
313 314 """
314 315 if not self.remotecache.connected:
315 316 self.connect()
316 317 cache = self.remotecache
317 318 writedata = self.writedata
318 319
319 320 repo = self.repo
320 321 count = len(fileids)
321 322 request = "get\n%d\n" % count
322 323 idmap = {}
323 324 reponame = repo.name
324 325 for file, id in fileids:
325 326 fullid = getcachekey(reponame, file, id)
326 327 if self.cacheprocesspasspath:
327 328 request += file + '\0'
328 329 request += fullid + "\n"
329 330 idmap[fullid] = file
330 331
331 332 cache.request(request)
332 333
333 334 total = count
334 335 self.ui.progress(_downloading, 0, total=count)
335 336
336 337 missed = []
337 338 count = 0
338 339 while True:
339 340 missingid = cache.receiveline()
340 341 if not missingid:
341 342 missedset = set(missed)
342 343 for missingid in idmap.iterkeys():
343 344 if not missingid in missedset:
344 345 missed.append(missingid)
345 346 self.ui.warn(_("warning: cache connection closed early - " +
346 347 "falling back to server\n"))
347 348 break
348 349 if missingid == "0":
349 350 break
350 351 if missingid.startswith("_hits_"):
351 352 # receive progress reports
352 353 parts = missingid.split("_")
353 354 count += int(parts[2])
354 355 self.ui.progress(_downloading, count, total=total)
355 356 continue
356 357
357 358 missed.append(missingid)
358 359
359 360 global fetchmisses
360 361 fetchmisses += len(missed)
361 362
362 363 count = [total - len(missed)]
363 364 fromcache = count[0]
364 365 self.ui.progress(_downloading, count[0], total=total)
365 366 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
366 367 count[0], total, hit=count[0], total=total)
367 368
368 369 oldumask = os.umask(0o002)
369 370 try:
370 371 # receive cache misses from master
371 372 if missed:
372 373 def progresstick():
373 374 count[0] += 1
374 375 self.ui.progress(_downloading, count[0], total=total)
375 376 # When verbose is true, sshpeer prints 'running ssh...'
376 377 # to stdout, which can interfere with some command
377 378 # outputs
378 379 verbose = self.ui.verbose
379 380 self.ui.verbose = False
380 381 try:
381 382 with self._connect() as conn:
382 383 remote = conn.peer
383 384 if remote.capable(
384 385 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
385 386 if not isinstance(remote, _sshv1peer):
386 387 raise error.Abort('remotefilelog requires ssh '
387 388 'servers')
388 389 step = self.ui.configint('remotefilelog',
389 390 'getfilesstep')
390 391 getfilestype = self.ui.config('remotefilelog',
391 392 'getfilestype')
392 393 if getfilestype == 'threaded':
393 394 _getfiles = _getfiles_threaded
394 395 else:
395 396 _getfiles = _getfiles_optimistic
396 397 _getfiles(remote, self.receivemissing, progresstick,
397 398 missed, idmap, step)
398 399 elif remote.capable("x_rfl_getfile"):
399 400 if remote.capable('batch'):
400 401 batchdefault = 100
401 402 else:
402 403 batchdefault = 10
403 404 batchsize = self.ui.configint(
404 405 'remotefilelog', 'batchsize', batchdefault)
405 406 _getfilesbatch(
406 407 remote, self.receivemissing, progresstick,
407 408 missed, idmap, batchsize)
408 409 else:
409 410 raise error.Abort("configured remotefilelog server"
410 411 " does not support remotefilelog")
411 412
412 413 self.ui.log("remotefilefetchlog",
413 414 "Success\n",
414 415 fetched_files = count[0] - fromcache,
415 416 total_to_fetch = total - fromcache)
416 417 except Exception:
417 418 self.ui.log("remotefilefetchlog",
418 419 "Fail\n",
419 420 fetched_files = count[0] - fromcache,
420 421 total_to_fetch = total - fromcache)
421 422 raise
422 423 finally:
423 424 self.ui.verbose = verbose
424 425 # send to memcache
425 426 count[0] = len(missed)
426 427 request = "set\n%d\n%s\n" % (count[0], "\n".join(missed))
427 428 cache.request(request)
428 429
429 430 self.ui.progress(_downloading, None)
430 431
431 432 # mark ourselves as a user of this cache
432 433 writedata.markrepo(self.repo.path)
433 434 finally:
434 435 os.umask(oldumask)
435 436
436 437 def receivemissing(self, pipe, filename, node):
437 438 line = pipe.readline()[:-1]
438 439 if not line:
439 440 raise error.ResponseError(_("error downloading file contents:"),
440 441 _("connection closed early"))
441 442 size = int(line)
442 443 data = pipe.read(size)
443 444 if len(data) != size:
444 445 raise error.ResponseError(_("error downloading file contents:"),
445 446 _("only received %s of %s bytes")
446 447 % (len(data), size))
447 448
448 449 self.writedata.addremotefilelognode(filename, bin(node),
449 450 zlib.decompress(data))
450 451
451 452 def connect(self):
452 453 if self.cacheprocess:
453 454 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
454 455 self.remotecache.connect(cmd)
455 456 else:
456 457 # If no cache process is specified, we fake one that always
457 458 # returns cache misses. This enables tests to run easily
458 459 # and may eventually allow us to be a drop in replacement
459 460 # for the largefiles extension.
460 461 class simplecache(object):
461 462 def __init__(self):
462 463 self.missingids = []
463 464 self.connected = True
464 465
465 466 def close(self):
466 467 pass
467 468
468 469 def request(self, value, flush=True):
469 470 lines = value.split("\n")
470 471 if lines[0] != "get":
471 472 return
472 473 self.missingids = lines[2:-1]
473 474 self.missingids.append('0')
474 475
475 476 def receiveline(self):
476 477 if len(self.missingids) > 0:
477 478 return self.missingids.pop(0)
478 479 return None
479 480
480 481 self.remotecache = simplecache()
481 482
482 483 def close(self):
483 484 if fetches:
484 485 msg = ("%s files fetched over %d fetches - " +
485 486 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
486 487 fetched,
487 488 fetches,
488 489 fetchmisses,
489 490 float(fetched - fetchmisses) / float(fetched) * 100.0,
490 491 fetchcost)
491 492 if self.debugoutput:
492 493 self.ui.warn(msg)
493 494 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
494 495 remotefilelogfetched=fetched,
495 496 remotefilelogfetches=fetches,
496 497 remotefilelogfetchmisses=fetchmisses,
497 498 remotefilelogfetchtime=fetchcost * 1000)
498 499
499 500 if self.remotecache.connected:
500 501 self.remotecache.close()
501 502
502 503 def prefetch(self, fileids, force=False, fetchdata=True,
503 504 fetchhistory=False):
504 505 """downloads the given file versions to the cache
505 506 """
506 507 repo = self.repo
507 508 idstocheck = []
508 509 for file, id in fileids:
509 510 # hack
510 511 # - we don't use .hgtags
511 512 # - workingctx produces ids with length 42,
512 513 # which we skip since they aren't in any cache
513 514 if (file == '.hgtags' or len(id) == 42
514 515 or not repo.shallowmatch(file)):
515 516 continue
516 517
517 518 idstocheck.append((file, bin(id)))
518 519
519 520 datastore = self.datastore
520 521 historystore = self.historystore
521 522 if force:
522 523 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
523 524 historystore = metadatastore.unionmetadatastore(
524 525 *repo.sharedhistorystores)
525 526
526 527 missingids = set()
527 528 if fetchdata:
528 529 missingids.update(datastore.getmissing(idstocheck))
529 530 if fetchhistory:
530 531 missingids.update(historystore.getmissing(idstocheck))
531 532
532 533 # partition missing nodes into nullid and not-nullid so we can
533 534 # warn about this filtering potentially shadowing bugs.
534 535 nullids = len([None for unused, id in missingids if id == nullid])
535 536 if nullids:
536 537 missingids = [(f, id) for f, id in missingids if id != nullid]
537 538 repo.ui.develwarn(
538 539 ('remotefilelog not fetching %d null revs'
539 540 ' - this is likely hiding bugs' % nullids),
540 541 config='remotefilelog-ext')
541 542 if missingids:
542 543 global fetches, fetched, fetchcost
543 544 fetches += 1
544 545
545 546 # We want to be able to detect excess individual file downloads, so
546 547 # let's log that information for debugging.
547 548 if fetches >= 15 and fetches < 18:
548 549 if fetches == 15:
549 550 fetchwarning = self.ui.config('remotefilelog',
550 551 'fetchwarning')
551 552 if fetchwarning:
552 553 self.ui.warn(fetchwarning + '\n')
553 554 self.logstacktrace()
554 555 missingids = [(file, hex(id)) for file, id in missingids]
555 556 fetched += len(missingids)
556 557 start = time.time()
557 558 missingids = self.request(missingids)
558 559 if missingids:
559 560 raise error.Abort(_("unable to download %d files") %
560 561 len(missingids))
561 562 fetchcost += time.time() - start
562 563 self._lfsprefetch(fileids)
563 564
564 565 def _lfsprefetch(self, fileids):
565 566 if not _lfsmod or not util.safehasattr(
566 567 self.repo.svfs, 'lfslocalblobstore'):
567 568 return
568 569 if not _lfsmod.wrapper.candownload(self.repo):
569 570 return
570 571 pointers = []
571 572 store = self.repo.svfs.lfslocalblobstore
572 573 for file, id in fileids:
573 574 node = bin(id)
574 575 rlog = self.repo.file(file)
575 576 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
576 577 text = rlog.revision(node, raw=True)
577 578 p = _lfsmod.pointer.deserialize(text)
578 579 oid = p.oid()
579 580 if not store.has(oid):
580 581 pointers.append(p)
581 582 if len(pointers) > 0:
582 583 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
583 584 assert all(store.has(p.oid()) for p in pointers)
584 585
585 586 def logstacktrace(self):
586 587 import traceback
587 588 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
588 589 ''.join(traceback.format_stack()))
@@ -1,491 +1,492 b''
1 1 # shallowutil.py -- remotefilelog utilities
2 2 #
3 3 # Copyright 2014 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7 from __future__ import absolute_import
8 8
9 9 import collections
10 10 import errno
11 11 import hashlib
12 12 import os
13 13 import stat
14 14 import struct
15 15 import tempfile
16 16
17 17 from mercurial.i18n import _
18 18 from mercurial import (
19 19 error,
20 node,
20 21 pycompat,
21 22 revlog,
22 23 util,
23 24 )
24 25 from mercurial.utils import (
25 26 storageutil,
26 27 stringutil,
27 28 )
28 29 from . import constants
29 30
30 31 if not pycompat.iswindows:
31 32 import grp
32 33
33 34 def isenabled(repo):
34 35 """returns whether the repository is remotefilelog enabled or not"""
35 36 return constants.SHALLOWREPO_REQUIREMENT in repo.requirements
36 37
37 38 def getcachekey(reponame, file, id):
38 pathhash = hashlib.sha1(file).hexdigest()
39 pathhash = node.hex(hashlib.sha1(file).digest())
39 40 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
40 41
41 42 def getlocalkey(file, id):
42 pathhash = hashlib.sha1(file).hexdigest()
43 pathhash = node.hex(hashlib.sha1(file).digest())
43 44 return os.path.join(pathhash, id)
44 45
45 46 def getcachepath(ui, allowempty=False):
46 47 cachepath = ui.config("remotefilelog", "cachepath")
47 48 if not cachepath:
48 49 if allowempty:
49 50 return None
50 51 else:
51 52 raise error.Abort(_("could not find config option "
52 53 "remotefilelog.cachepath"))
53 54 return util.expandpath(cachepath)
54 55
55 56 def getcachepackpath(repo, category):
56 57 cachepath = getcachepath(repo.ui)
57 58 if category != constants.FILEPACK_CATEGORY:
58 59 return os.path.join(cachepath, repo.name, 'packs', category)
59 60 else:
60 61 return os.path.join(cachepath, repo.name, 'packs')
61 62
62 63 def getlocalpackpath(base, category):
63 64 return os.path.join(base, 'packs', category)
64 65
65 66 def createrevlogtext(text, copyfrom=None, copyrev=None):
66 67 """returns a string that matches the revlog contents in a
67 68 traditional revlog
68 69 """
69 70 meta = {}
70 71 if copyfrom or text.startswith('\1\n'):
71 72 if copyfrom:
72 73 meta['copy'] = copyfrom
73 74 meta['copyrev'] = copyrev
74 75 text = storageutil.packmeta(meta, text)
75 76
76 77 return text
77 78
78 79 def parsemeta(text):
79 80 """parse mercurial filelog metadata"""
80 81 meta, size = storageutil.parsemeta(text)
81 82 if text.startswith('\1\n'):
82 83 s = text.index('\1\n', 2)
83 84 text = text[s + 2:]
84 85 return meta or {}, text
85 86
86 87 def sumdicts(*dicts):
87 88 """Adds all the values of *dicts together into one dictionary. This assumes
88 89 the values in *dicts are all summable.
89 90
90 91 e.g. [{'a': 4', 'b': 2}, {'b': 3, 'c': 1}] -> {'a': 4, 'b': 5, 'c': 1}
91 92 """
92 93 result = collections.defaultdict(lambda: 0)
93 94 for dict in dicts:
94 95 for k, v in dict.iteritems():
95 96 result[k] += v
96 97 return result
97 98
98 99 def prefixkeys(dict, prefix):
99 100 """Returns ``dict`` with ``prefix`` prepended to all its keys."""
100 101 result = {}
101 102 for k, v in dict.iteritems():
102 103 result[prefix + k] = v
103 104 return result
104 105
105 106 def reportpackmetrics(ui, prefix, *stores):
106 107 dicts = [s.getmetrics() for s in stores]
107 108 dict = prefixkeys(sumdicts(*dicts), prefix + '_')
108 109 ui.log(prefix + "_packsizes", "", **pycompat.strkwargs(dict))
109 110
110 111 def _parsepackmeta(metabuf):
111 112 """parse datapack meta, bytes (<metadata-list>) -> dict
112 113
113 114 The dict contains raw content - both keys and values are strings.
114 115 Upper-level business may want to convert some of them to other types like
115 116 integers, on their own.
116 117
117 118 raise ValueError if the data is corrupted
118 119 """
119 120 metadict = {}
120 121 offset = 0
121 122 buflen = len(metabuf)
122 123 while buflen - offset >= 3:
123 124 key = metabuf[offset]
124 125 offset += 1
125 126 metalen = struct.unpack_from('!H', metabuf, offset)[0]
126 127 offset += 2
127 128 if offset + metalen > buflen:
128 129 raise ValueError('corrupted metadata: incomplete buffer')
129 130 value = metabuf[offset:offset + metalen]
130 131 metadict[key] = value
131 132 offset += metalen
132 133 if offset != buflen:
133 134 raise ValueError('corrupted metadata: redundant data')
134 135 return metadict
135 136
136 137 def _buildpackmeta(metadict):
137 138 """reverse of _parsepackmeta, dict -> bytes (<metadata-list>)
138 139
139 140 The dict contains raw content - both keys and values are strings.
140 141 Upper-level business may want to serialize some of other types (like
141 142 integers) to strings before calling this function.
142 143
143 144 raise ProgrammingError when metadata key is illegal, or ValueError if
144 145 length limit is exceeded
145 146 """
146 147 metabuf = ''
147 148 for k, v in sorted((metadict or {}).iteritems()):
148 149 if len(k) != 1:
149 150 raise error.ProgrammingError('packmeta: illegal key: %s' % k)
150 151 if len(v) > 0xfffe:
151 152 raise ValueError('metadata value is too long: 0x%x > 0xfffe'
152 153 % len(v))
153 154 metabuf += k
154 155 metabuf += struct.pack('!H', len(v))
155 156 metabuf += v
156 157 # len(metabuf) is guaranteed representable in 4 bytes, because there are
157 158 # only 256 keys, and for each value, len(value) <= 0xfffe.
158 159 return metabuf
159 160
160 161 _metaitemtypes = {
161 162 constants.METAKEYFLAG: (int, pycompat.long),
162 163 constants.METAKEYSIZE: (int, pycompat.long),
163 164 }
164 165
165 166 def buildpackmeta(metadict):
166 167 """like _buildpackmeta, but typechecks metadict and normalize it.
167 168
168 169 This means, METAKEYSIZE and METAKEYSIZE should have integers as values,
169 170 and METAKEYFLAG will be dropped if its value is 0.
170 171 """
171 172 newmeta = {}
172 173 for k, v in (metadict or {}).iteritems():
173 174 expectedtype = _metaitemtypes.get(k, (bytes,))
174 175 if not isinstance(v, expectedtype):
175 176 raise error.ProgrammingError('packmeta: wrong type of key %s' % k)
176 177 # normalize int to binary buffer
177 178 if int in expectedtype:
178 179 # optimization: remove flag if it's 0 to save space
179 180 if k == constants.METAKEYFLAG and v == 0:
180 181 continue
181 182 v = int2bin(v)
182 183 newmeta[k] = v
183 184 return _buildpackmeta(newmeta)
184 185
185 186 def parsepackmeta(metabuf):
186 187 """like _parsepackmeta, but convert fields to desired types automatically.
187 188
188 189 This means, METAKEYFLAG and METAKEYSIZE fields will be converted to
189 190 integers.
190 191 """
191 192 metadict = _parsepackmeta(metabuf)
192 193 for k, v in metadict.iteritems():
193 194 if k in _metaitemtypes and int in _metaitemtypes[k]:
194 195 metadict[k] = bin2int(v)
195 196 return metadict
196 197
197 198 def int2bin(n):
198 199 """convert a non-negative integer to raw binary buffer"""
199 200 buf = bytearray()
200 201 while n > 0:
201 202 buf.insert(0, n & 0xff)
202 203 n >>= 8
203 204 return bytes(buf)
204 205
205 206 def bin2int(buf):
206 207 """the reverse of int2bin, convert a binary buffer to an integer"""
207 208 x = 0
208 209 for b in bytearray(buf):
209 210 x <<= 8
210 211 x |= b
211 212 return x
212 213
213 214 def parsesizeflags(raw):
214 215 """given a remotefilelog blob, return (headersize, rawtextsize, flags)
215 216
216 217 see remotefilelogserver.createfileblob for the format.
217 218 raise RuntimeError if the content is illformed.
218 219 """
219 220 flags = revlog.REVIDX_DEFAULT_FLAGS
220 221 size = None
221 222 try:
222 223 index = raw.index('\0')
223 224 header = raw[:index]
224 225 if header.startswith('v'):
225 226 # v1 and above, header starts with 'v'
226 227 if header.startswith('v1\n'):
227 228 for s in header.split('\n'):
228 229 if s.startswith(constants.METAKEYSIZE):
229 230 size = int(s[len(constants.METAKEYSIZE):])
230 231 elif s.startswith(constants.METAKEYFLAG):
231 232 flags = int(s[len(constants.METAKEYFLAG):])
232 233 else:
233 234 raise RuntimeError('unsupported remotefilelog header: %s'
234 235 % header)
235 236 else:
236 237 # v0, str(int(size)) is the header
237 238 size = int(header)
238 239 except ValueError:
239 240 raise RuntimeError("unexpected remotefilelog header: illegal format")
240 241 if size is None:
241 242 raise RuntimeError("unexpected remotefilelog header: no size found")
242 243 return index + 1, size, flags
243 244
244 245 def buildfileblobheader(size, flags, version=None):
245 246 """return the header of a remotefilelog blob.
246 247
247 248 see remotefilelogserver.createfileblob for the format.
248 249 approximately the reverse of parsesizeflags.
249 250
250 251 version could be 0 or 1, or None (auto decide).
251 252 """
252 253 # choose v0 if flags is empty, otherwise v1
253 254 if version is None:
254 255 version = int(bool(flags))
255 256 if version == 1:
256 257 header = ('v1\n%s%d\n%s%d'
257 258 % (constants.METAKEYSIZE, size,
258 259 constants.METAKEYFLAG, flags))
259 260 elif version == 0:
260 261 if flags:
261 262 raise error.ProgrammingError('fileblob v0 does not support flag')
262 263 header = '%d' % size
263 264 else:
264 265 raise error.ProgrammingError('unknown fileblob version %d' % version)
265 266 return header
266 267
267 268 def ancestormap(raw):
268 269 offset, size, flags = parsesizeflags(raw)
269 270 start = offset + size
270 271
271 272 mapping = {}
272 273 while start < len(raw):
273 274 divider = raw.index('\0', start + 80)
274 275
275 276 currentnode = raw[start:(start + 20)]
276 277 p1 = raw[(start + 20):(start + 40)]
277 278 p2 = raw[(start + 40):(start + 60)]
278 279 linknode = raw[(start + 60):(start + 80)]
279 280 copyfrom = raw[(start + 80):divider]
280 281
281 282 mapping[currentnode] = (p1, p2, linknode, copyfrom)
282 283 start = divider + 1
283 284
284 285 return mapping
285 286
286 287 def readfile(path):
287 288 f = open(path, 'rb')
288 289 try:
289 290 result = f.read()
290 291
291 292 # we should never have empty files
292 293 if not result:
293 294 os.remove(path)
294 295 raise IOError("empty file: %s" % path)
295 296
296 297 return result
297 298 finally:
298 299 f.close()
299 300
300 301 def unlinkfile(filepath):
301 302 if pycompat.iswindows:
302 303 # On Windows, os.unlink cannnot delete readonly files
303 304 os.chmod(filepath, stat.S_IWUSR)
304 305 os.unlink(filepath)
305 306
306 307 def renamefile(source, destination):
307 308 if pycompat.iswindows:
308 309 # On Windows, os.rename cannot rename readonly files
309 310 # and cannot overwrite destination if it exists
310 311 os.chmod(source, stat.S_IWUSR)
311 312 if os.path.isfile(destination):
312 313 os.chmod(destination, stat.S_IWUSR)
313 314 os.unlink(destination)
314 315
315 316 os.rename(source, destination)
316 317
317 318 def writefile(path, content, readonly=False):
318 319 dirname, filename = os.path.split(path)
319 320 if not os.path.exists(dirname):
320 321 try:
321 322 os.makedirs(dirname)
322 323 except OSError as ex:
323 324 if ex.errno != errno.EEXIST:
324 325 raise
325 326
326 327 fd, temp = tempfile.mkstemp(prefix='.%s-' % filename, dir=dirname)
327 328 os.close(fd)
328 329
329 330 try:
330 331 f = util.posixfile(temp, 'wb')
331 332 f.write(content)
332 333 f.close()
333 334
334 335 if readonly:
335 336 mode = 0o444
336 337 else:
337 338 # tempfiles are created with 0o600, so we need to manually set the
338 339 # mode.
339 340 oldumask = os.umask(0)
340 341 # there's no way to get the umask without modifying it, so set it
341 342 # back
342 343 os.umask(oldumask)
343 344 mode = ~oldumask
344 345
345 346 renamefile(temp, path)
346 347 os.chmod(path, mode)
347 348 except Exception:
348 349 try:
349 350 unlinkfile(temp)
350 351 except OSError:
351 352 pass
352 353 raise
353 354
354 355 def sortnodes(nodes, parentfunc):
355 356 """Topologically sorts the nodes, using the parentfunc to find
356 357 the parents of nodes."""
357 358 nodes = set(nodes)
358 359 childmap = {}
359 360 parentmap = {}
360 361 roots = []
361 362
362 363 # Build a child and parent map
363 364 for n in nodes:
364 365 parents = [p for p in parentfunc(n) if p in nodes]
365 366 parentmap[n] = set(parents)
366 367 for p in parents:
367 368 childmap.setdefault(p, set()).add(n)
368 369 if not parents:
369 370 roots.append(n)
370 371
371 372 roots.sort()
372 373 # Process roots, adding children to the queue as they become roots
373 374 results = []
374 375 while roots:
375 376 n = roots.pop(0)
376 377 results.append(n)
377 378 if n in childmap:
378 379 children = childmap[n]
379 380 for c in children:
380 381 childparents = parentmap[c]
381 382 childparents.remove(n)
382 383 if len(childparents) == 0:
383 384 # insert at the beginning, that way child nodes
384 385 # are likely to be output immediately after their
385 386 # parents. This gives better compression results.
386 387 roots.insert(0, c)
387 388
388 389 return results
389 390
390 391 def readexactly(stream, n):
391 392 '''read n bytes from stream.read and abort if less was available'''
392 393 s = stream.read(n)
393 394 if len(s) < n:
394 395 raise error.Abort(_("stream ended unexpectedly"
395 396 " (got %d bytes, expected %d)")
396 397 % (len(s), n))
397 398 return s
398 399
399 400 def readunpack(stream, fmt):
400 401 data = readexactly(stream, struct.calcsize(fmt))
401 402 return struct.unpack(fmt, data)
402 403
403 404 def readpath(stream):
404 405 rawlen = readexactly(stream, constants.FILENAMESIZE)
405 406 pathlen = struct.unpack(constants.FILENAMESTRUCT, rawlen)[0]
406 407 return readexactly(stream, pathlen)
407 408
408 409 def readnodelist(stream):
409 410 rawlen = readexactly(stream, constants.NODECOUNTSIZE)
410 411 nodecount = struct.unpack(constants.NODECOUNTSTRUCT, rawlen)[0]
411 412 for i in pycompat.xrange(nodecount):
412 413 yield readexactly(stream, constants.NODESIZE)
413 414
414 415 def readpathlist(stream):
415 416 rawlen = readexactly(stream, constants.PATHCOUNTSIZE)
416 417 pathcount = struct.unpack(constants.PATHCOUNTSTRUCT, rawlen)[0]
417 418 for i in pycompat.xrange(pathcount):
418 419 yield readpath(stream)
419 420
420 421 def getgid(groupname):
421 422 try:
422 423 gid = grp.getgrnam(groupname).gr_gid
423 424 return gid
424 425 except KeyError:
425 426 return None
426 427
427 428 def setstickygroupdir(path, gid, warn=None):
428 429 if gid is None:
429 430 return
430 431 try:
431 432 os.chown(path, -1, gid)
432 433 os.chmod(path, 0o2775)
433 434 except (IOError, OSError) as ex:
434 435 if warn:
435 436 warn(_('unable to chown/chmod on %s: %s\n') % (path, ex))
436 437
437 438 def mkstickygroupdir(ui, path):
438 439 """Creates the given directory (if it doesn't exist) and give it a
439 440 particular group with setgid enabled."""
440 441 gid = None
441 442 groupname = ui.config("remotefilelog", "cachegroup")
442 443 if groupname:
443 444 gid = getgid(groupname)
444 445 if gid is None:
445 446 ui.warn(_('unable to resolve group name: %s\n') % groupname)
446 447
447 448 # we use a single stat syscall to test the existence and mode / group bit
448 449 st = None
449 450 try:
450 451 st = os.stat(path)
451 452 except OSError:
452 453 pass
453 454
454 455 if st:
455 456 # exists
456 457 if (st.st_mode & 0o2775) != 0o2775 or st.st_gid != gid:
457 458 # permission needs to be fixed
458 459 setstickygroupdir(path, gid, ui.warn)
459 460 return
460 461
461 462 oldumask = os.umask(0o002)
462 463 try:
463 464 missingdirs = [path]
464 465 path = os.path.dirname(path)
465 466 while path and not os.path.exists(path):
466 467 missingdirs.append(path)
467 468 path = os.path.dirname(path)
468 469
469 470 for path in reversed(missingdirs):
470 471 try:
471 472 os.mkdir(path)
472 473 except OSError as ex:
473 474 if ex.errno != errno.EEXIST:
474 475 raise
475 476
476 477 for path in missingdirs:
477 478 setstickygroupdir(path, gid, ui.warn)
478 479 finally:
479 480 os.umask(oldumask)
480 481
481 482 def getusername(ui):
482 483 try:
483 484 return stringutil.shortuser(ui.username())
484 485 except Exception:
485 486 return 'unknown'
486 487
487 488 def getreponame(ui):
488 489 reponame = ui.config('paths', 'default')
489 490 if reponame:
490 491 return os.path.basename(reponame)
491 492 return "unknown"
General Comments 0
You need to be logged in to leave comments. Login now