##// END OF EJS Templates
remotefilelog: avoid temporarily using "count" variable as synonym for "total"...
Martin von Zweigbergk -
r40882:b34b1b86 default
parent child Browse files
Show More
@@ -1,589 +1,588 b''
1 1 # fileserverclient.py - client for communicating with the cache process
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import io
12 12 import os
13 13 import threading
14 14 import time
15 15 import zlib
16 16
17 17 from mercurial.i18n import _
18 18 from mercurial.node import bin, hex, nullid
19 19 from mercurial import (
20 20 error,
21 21 node,
22 22 pycompat,
23 23 revlog,
24 24 sshpeer,
25 25 util,
26 26 wireprotov1peer,
27 27 )
28 28 from mercurial.utils import procutil
29 29
30 30 from . import (
31 31 constants,
32 32 contentstore,
33 33 metadatastore,
34 34 )
35 35
36 36 _sshv1peer = sshpeer.sshv1peer
37 37
38 38 # Statistics for debugging
39 39 fetchcost = 0
40 40 fetches = 0
41 41 fetched = 0
42 42 fetchmisses = 0
43 43
44 44 _lfsmod = None
45 45
46 46 def getcachekey(reponame, file, id):
47 47 pathhash = node.hex(hashlib.sha1(file).digest())
48 48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
49 49
50 50 def getlocalkey(file, id):
51 51 pathhash = node.hex(hashlib.sha1(file).digest())
52 52 return os.path.join(pathhash, id)
53 53
54 54 def peersetup(ui, peer):
55 55
56 56 class remotefilepeer(peer.__class__):
57 57 @wireprotov1peer.batchable
58 58 def x_rfl_getfile(self, file, node):
59 59 if not self.capable('x_rfl_getfile'):
60 60 raise error.Abort(
61 61 'configured remotefile server does not support getfile')
62 62 f = wireprotov1peer.future()
63 63 yield {'file': file, 'node': node}, f
64 64 code, data = f.value.split('\0', 1)
65 65 if int(code):
66 66 raise error.LookupError(file, node, data)
67 67 yield data
68 68
69 69 @wireprotov1peer.batchable
70 70 def x_rfl_getflogheads(self, path):
71 71 if not self.capable('x_rfl_getflogheads'):
72 72 raise error.Abort('configured remotefile server does not '
73 73 'support getflogheads')
74 74 f = wireprotov1peer.future()
75 75 yield {'path': path}, f
76 76 heads = f.value.split('\n') if f.value else []
77 77 yield heads
78 78
79 79 def _updatecallstreamopts(self, command, opts):
80 80 if command != 'getbundle':
81 81 return
82 82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
83 83 not in self.capabilities()):
84 84 return
85 85 if not util.safehasattr(self, '_localrepo'):
86 86 return
87 87 if (constants.SHALLOWREPO_REQUIREMENT
88 88 not in self._localrepo.requirements):
89 89 return
90 90
91 91 bundlecaps = opts.get('bundlecaps')
92 92 if bundlecaps:
93 93 bundlecaps = [bundlecaps]
94 94 else:
95 95 bundlecaps = []
96 96
97 97 # shallow, includepattern, and excludepattern are a hacky way of
98 98 # carrying over data from the local repo to this getbundle
99 99 # command. We need to do it this way because bundle1 getbundle
100 100 # doesn't provide any other place we can hook in to manipulate
101 101 # getbundle args before it goes across the wire. Once we get rid
102 102 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
103 103 # do this more cleanly.
104 104 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
105 105 if self._localrepo.includepattern:
106 106 patterns = '\0'.join(self._localrepo.includepattern)
107 107 includecap = "includepattern=" + patterns
108 108 bundlecaps.append(includecap)
109 109 if self._localrepo.excludepattern:
110 110 patterns = '\0'.join(self._localrepo.excludepattern)
111 111 excludecap = "excludepattern=" + patterns
112 112 bundlecaps.append(excludecap)
113 113 opts['bundlecaps'] = ','.join(bundlecaps)
114 114
115 115 def _sendrequest(self, command, args, **opts):
116 116 self._updatecallstreamopts(command, args)
117 117 return super(remotefilepeer, self)._sendrequest(command, args,
118 118 **opts)
119 119
120 120 def _callstream(self, command, **opts):
121 121 supertype = super(remotefilepeer, self)
122 122 if not util.safehasattr(supertype, '_sendrequest'):
123 123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
124 124 return super(remotefilepeer, self)._callstream(command, **opts)
125 125
126 126 peer.__class__ = remotefilepeer
127 127
128 128 class cacheconnection(object):
129 129 """The connection for communicating with the remote cache. Performs
130 130 gets and sets by communicating with an external process that has the
131 131 cache-specific implementation.
132 132 """
133 133 def __init__(self):
134 134 self.pipeo = self.pipei = self.pipee = None
135 135 self.subprocess = None
136 136 self.connected = False
137 137
138 138 def connect(self, cachecommand):
139 139 if self.pipeo:
140 140 raise error.Abort(_("cache connection already open"))
141 141 self.pipei, self.pipeo, self.pipee, self.subprocess = \
142 142 procutil.popen4(cachecommand)
143 143 self.connected = True
144 144
145 145 def close(self):
146 146 def tryclose(pipe):
147 147 try:
148 148 pipe.close()
149 149 except Exception:
150 150 pass
151 151 if self.connected:
152 152 try:
153 153 self.pipei.write("exit\n")
154 154 except Exception:
155 155 pass
156 156 tryclose(self.pipei)
157 157 self.pipei = None
158 158 tryclose(self.pipeo)
159 159 self.pipeo = None
160 160 tryclose(self.pipee)
161 161 self.pipee = None
162 162 try:
163 163 # Wait for process to terminate, making sure to avoid deadlock.
164 164 # See https://docs.python.org/2/library/subprocess.html for
165 165 # warnings about wait() and deadlocking.
166 166 self.subprocess.communicate()
167 167 except Exception:
168 168 pass
169 169 self.subprocess = None
170 170 self.connected = False
171 171
172 172 def request(self, request, flush=True):
173 173 if self.connected:
174 174 try:
175 175 self.pipei.write(request)
176 176 if flush:
177 177 self.pipei.flush()
178 178 except IOError:
179 179 self.close()
180 180
181 181 def receiveline(self):
182 182 if not self.connected:
183 183 return None
184 184 try:
185 185 result = self.pipeo.readline()[:-1]
186 186 if not result:
187 187 self.close()
188 188 except IOError:
189 189 self.close()
190 190
191 191 return result
192 192
193 193 def _getfilesbatch(
194 194 remote, receivemissing, progresstick, missed, idmap, batchsize):
195 195 # Over http(s), iterbatch is a streamy method and we can start
196 196 # looking at results early. This means we send one (potentially
197 197 # large) request, but then we show nice progress as we process
198 198 # file results, rather than showing chunks of $batchsize in
199 199 # progress.
200 200 #
201 201 # Over ssh, iterbatch isn't streamy because batch() wasn't
202 202 # explicitly designed as a streaming method. In the future we
203 203 # should probably introduce a streambatch() method upstream and
204 204 # use that for this.
205 205 with remote.commandexecutor() as e:
206 206 futures = []
207 207 for m in missed:
208 208 futures.append(e.callcommand('x_rfl_getfile', {
209 209 'file': idmap[m],
210 210 'node': m[-40:]
211 211 }))
212 212
213 213 for i, m in enumerate(missed):
214 214 r = futures[i].result()
215 215 futures[i] = None # release memory
216 216 file_ = idmap[m]
217 217 node = m[-40:]
218 218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
219 219 progresstick()
220 220
221 221 def _getfiles_optimistic(
222 222 remote, receivemissing, progresstick, missed, idmap, step):
223 223 remote._callstream("x_rfl_getfiles")
224 224 i = 0
225 225 pipeo = remote._pipeo
226 226 pipei = remote._pipei
227 227 while i < len(missed):
228 228 # issue a batch of requests
229 229 start = i
230 230 end = min(len(missed), start + step)
231 231 i = end
232 232 for missingid in missed[start:end]:
233 233 # issue new request
234 234 versionid = missingid[-40:]
235 235 file = idmap[missingid]
236 236 sshrequest = "%s%s\n" % (versionid, file)
237 237 pipeo.write(sshrequest)
238 238 pipeo.flush()
239 239
240 240 # receive batch results
241 241 for missingid in missed[start:end]:
242 242 versionid = missingid[-40:]
243 243 file = idmap[missingid]
244 244 receivemissing(pipei, file, versionid)
245 245 progresstick()
246 246
247 247 # End the command
248 248 pipeo.write('\n')
249 249 pipeo.flush()
250 250
251 251 def _getfiles_threaded(
252 252 remote, receivemissing, progresstick, missed, idmap, step):
253 253 remote._callstream("getfiles")
254 254 pipeo = remote._pipeo
255 255 pipei = remote._pipei
256 256
257 257 def writer():
258 258 for missingid in missed:
259 259 versionid = missingid[-40:]
260 260 file = idmap[missingid]
261 261 sshrequest = "%s%s\n" % (versionid, file)
262 262 pipeo.write(sshrequest)
263 263 pipeo.flush()
264 264 writerthread = threading.Thread(target=writer)
265 265 writerthread.daemon = True
266 266 writerthread.start()
267 267
268 268 for missingid in missed:
269 269 versionid = missingid[-40:]
270 270 file = idmap[missingid]
271 271 receivemissing(pipei, file, versionid)
272 272 progresstick()
273 273
274 274 writerthread.join()
275 275 # End the command
276 276 pipeo.write('\n')
277 277 pipeo.flush()
278 278
279 279 class fileserverclient(object):
280 280 """A client for requesting files from the remote file server.
281 281 """
282 282 def __init__(self, repo):
283 283 ui = repo.ui
284 284 self.repo = repo
285 285 self.ui = ui
286 286 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
287 287 if self.cacheprocess:
288 288 self.cacheprocess = util.expandpath(self.cacheprocess)
289 289
290 290 # This option causes remotefilelog to pass the full file path to the
291 291 # cacheprocess instead of a hashed key.
292 292 self.cacheprocesspasspath = ui.configbool(
293 293 "remotefilelog", "cacheprocess.includepath")
294 294
295 295 self.debugoutput = ui.configbool("remotefilelog", "debug")
296 296
297 297 self.remotecache = cacheconnection()
298 298
299 299 def setstore(self, datastore, historystore, writedata, writehistory):
300 300 self.datastore = datastore
301 301 self.historystore = historystore
302 302 self.writedata = writedata
303 303 self.writehistory = writehistory
304 304
305 305 def _connect(self):
306 306 return self.repo.connectionpool.get(self.repo.fallbackpath)
307 307
308 308 def request(self, fileids):
309 309 """Takes a list of filename/node pairs and fetches them from the
310 310 server. Files are stored in the local cache.
311 311 A list of nodes that the server couldn't find is returned.
312 312 If the connection fails, an exception is raised.
313 313 """
314 314 if not self.remotecache.connected:
315 315 self.connect()
316 316 cache = self.remotecache
317 317 writedata = self.writedata
318 318
319 319 repo = self.repo
320 count = len(fileids)
321 request = "get\n%d\n" % count
320 total = len(fileids)
321 request = "get\n%d\n" % total
322 322 idmap = {}
323 323 reponame = repo.name
324 324 for file, id in fileids:
325 325 fullid = getcachekey(reponame, file, id)
326 326 if self.cacheprocesspasspath:
327 327 request += file + '\0'
328 328 request += fullid + "\n"
329 329 idmap[fullid] = file
330 330
331 331 cache.request(request)
332 332
333 total = count
334 progress = self.ui.makeprogress(_('downloading'), total=count)
333 progress = self.ui.makeprogress(_('downloading'), total=total)
335 334 progress.update(0)
336 335
337 336 missed = []
338 337 count = 0
339 338 while True:
340 339 missingid = cache.receiveline()
341 340 if not missingid:
342 341 missedset = set(missed)
343 342 for missingid in idmap:
344 343 if not missingid in missedset:
345 344 missed.append(missingid)
346 345 self.ui.warn(_("warning: cache connection closed early - " +
347 346 "falling back to server\n"))
348 347 break
349 348 if missingid == "0":
350 349 break
351 350 if missingid.startswith("_hits_"):
352 351 # receive progress reports
353 352 parts = missingid.split("_")
354 353 count += int(parts[2])
355 354 progress.update(count)
356 355 continue
357 356
358 357 missed.append(missingid)
359 358
360 359 global fetchmisses
361 360 fetchmisses += len(missed)
362 361
363 362 count = [total - len(missed)]
364 363 fromcache = count[0]
365 364 progress.update(count[0], total=total)
366 365 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
367 366 count[0], total, hit=count[0], total=total)
368 367
369 368 oldumask = os.umask(0o002)
370 369 try:
371 370 # receive cache misses from master
372 371 if missed:
373 372 def progresstick():
374 373 count[0] += 1
375 374 progress.update(count[0])
376 375 # When verbose is true, sshpeer prints 'running ssh...'
377 376 # to stdout, which can interfere with some command
378 377 # outputs
379 378 verbose = self.ui.verbose
380 379 self.ui.verbose = False
381 380 try:
382 381 with self._connect() as conn:
383 382 remote = conn.peer
384 383 if remote.capable(
385 384 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
386 385 if not isinstance(remote, _sshv1peer):
387 386 raise error.Abort('remotefilelog requires ssh '
388 387 'servers')
389 388 step = self.ui.configint('remotefilelog',
390 389 'getfilesstep')
391 390 getfilestype = self.ui.config('remotefilelog',
392 391 'getfilestype')
393 392 if getfilestype == 'threaded':
394 393 _getfiles = _getfiles_threaded
395 394 else:
396 395 _getfiles = _getfiles_optimistic
397 396 _getfiles(remote, self.receivemissing, progresstick,
398 397 missed, idmap, step)
399 398 elif remote.capable("x_rfl_getfile"):
400 399 if remote.capable('batch'):
401 400 batchdefault = 100
402 401 else:
403 402 batchdefault = 10
404 403 batchsize = self.ui.configint(
405 404 'remotefilelog', 'batchsize', batchdefault)
406 405 _getfilesbatch(
407 406 remote, self.receivemissing, progresstick,
408 407 missed, idmap, batchsize)
409 408 else:
410 409 raise error.Abort("configured remotefilelog server"
411 410 " does not support remotefilelog")
412 411
413 412 self.ui.log("remotefilefetchlog",
414 413 "Success\n",
415 414 fetched_files = count[0] - fromcache,
416 415 total_to_fetch = total - fromcache)
417 416 except Exception:
418 417 self.ui.log("remotefilefetchlog",
419 418 "Fail\n",
420 419 fetched_files = count[0] - fromcache,
421 420 total_to_fetch = total - fromcache)
422 421 raise
423 422 finally:
424 423 self.ui.verbose = verbose
425 424 # send to memcache
426 425 count[0] = len(missed)
427 426 request = "set\n%d\n%s\n" % (count[0], "\n".join(missed))
428 427 cache.request(request)
429 428
430 429 progress.complete()
431 430
432 431 # mark ourselves as a user of this cache
433 432 writedata.markrepo(self.repo.path)
434 433 finally:
435 434 os.umask(oldumask)
436 435
437 436 def receivemissing(self, pipe, filename, node):
438 437 line = pipe.readline()[:-1]
439 438 if not line:
440 439 raise error.ResponseError(_("error downloading file contents:"),
441 440 _("connection closed early"))
442 441 size = int(line)
443 442 data = pipe.read(size)
444 443 if len(data) != size:
445 444 raise error.ResponseError(_("error downloading file contents:"),
446 445 _("only received %s of %s bytes")
447 446 % (len(data), size))
448 447
449 448 self.writedata.addremotefilelognode(filename, bin(node),
450 449 zlib.decompress(data))
451 450
452 451 def connect(self):
453 452 if self.cacheprocess:
454 453 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
455 454 self.remotecache.connect(cmd)
456 455 else:
457 456 # If no cache process is specified, we fake one that always
458 457 # returns cache misses. This enables tests to run easily
459 458 # and may eventually allow us to be a drop in replacement
460 459 # for the largefiles extension.
461 460 class simplecache(object):
462 461 def __init__(self):
463 462 self.missingids = []
464 463 self.connected = True
465 464
466 465 def close(self):
467 466 pass
468 467
469 468 def request(self, value, flush=True):
470 469 lines = value.split("\n")
471 470 if lines[0] != "get":
472 471 return
473 472 self.missingids = lines[2:-1]
474 473 self.missingids.append('0')
475 474
476 475 def receiveline(self):
477 476 if len(self.missingids) > 0:
478 477 return self.missingids.pop(0)
479 478 return None
480 479
481 480 self.remotecache = simplecache()
482 481
483 482 def close(self):
484 483 if fetches:
485 484 msg = ("%d files fetched over %d fetches - " +
486 485 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
487 486 fetched,
488 487 fetches,
489 488 fetchmisses,
490 489 float(fetched - fetchmisses) / float(fetched) * 100.0,
491 490 fetchcost)
492 491 if self.debugoutput:
493 492 self.ui.warn(msg)
494 493 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
495 494 remotefilelogfetched=fetched,
496 495 remotefilelogfetches=fetches,
497 496 remotefilelogfetchmisses=fetchmisses,
498 497 remotefilelogfetchtime=fetchcost * 1000)
499 498
500 499 if self.remotecache.connected:
501 500 self.remotecache.close()
502 501
503 502 def prefetch(self, fileids, force=False, fetchdata=True,
504 503 fetchhistory=False):
505 504 """downloads the given file versions to the cache
506 505 """
507 506 repo = self.repo
508 507 idstocheck = []
509 508 for file, id in fileids:
510 509 # hack
511 510 # - we don't use .hgtags
512 511 # - workingctx produces ids with length 42,
513 512 # which we skip since they aren't in any cache
514 513 if (file == '.hgtags' or len(id) == 42
515 514 or not repo.shallowmatch(file)):
516 515 continue
517 516
518 517 idstocheck.append((file, bin(id)))
519 518
520 519 datastore = self.datastore
521 520 historystore = self.historystore
522 521 if force:
523 522 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
524 523 historystore = metadatastore.unionmetadatastore(
525 524 *repo.sharedhistorystores)
526 525
527 526 missingids = set()
528 527 if fetchdata:
529 528 missingids.update(datastore.getmissing(idstocheck))
530 529 if fetchhistory:
531 530 missingids.update(historystore.getmissing(idstocheck))
532 531
533 532 # partition missing nodes into nullid and not-nullid so we can
534 533 # warn about this filtering potentially shadowing bugs.
535 534 nullids = len([None for unused, id in missingids if id == nullid])
536 535 if nullids:
537 536 missingids = [(f, id) for f, id in missingids if id != nullid]
538 537 repo.ui.develwarn(
539 538 ('remotefilelog not fetching %d null revs'
540 539 ' - this is likely hiding bugs' % nullids),
541 540 config='remotefilelog-ext')
542 541 if missingids:
543 542 global fetches, fetched, fetchcost
544 543 fetches += 1
545 544
546 545 # We want to be able to detect excess individual file downloads, so
547 546 # let's log that information for debugging.
548 547 if fetches >= 15 and fetches < 18:
549 548 if fetches == 15:
550 549 fetchwarning = self.ui.config('remotefilelog',
551 550 'fetchwarning')
552 551 if fetchwarning:
553 552 self.ui.warn(fetchwarning + '\n')
554 553 self.logstacktrace()
555 554 missingids = [(file, hex(id)) for file, id in missingids]
556 555 fetched += len(missingids)
557 556 start = time.time()
558 557 missingids = self.request(missingids)
559 558 if missingids:
560 559 raise error.Abort(_("unable to download %d files") %
561 560 len(missingids))
562 561 fetchcost += time.time() - start
563 562 self._lfsprefetch(fileids)
564 563
565 564 def _lfsprefetch(self, fileids):
566 565 if not _lfsmod or not util.safehasattr(
567 566 self.repo.svfs, 'lfslocalblobstore'):
568 567 return
569 568 if not _lfsmod.wrapper.candownload(self.repo):
570 569 return
571 570 pointers = []
572 571 store = self.repo.svfs.lfslocalblobstore
573 572 for file, id in fileids:
574 573 node = bin(id)
575 574 rlog = self.repo.file(file)
576 575 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
577 576 text = rlog.revision(node, raw=True)
578 577 p = _lfsmod.pointer.deserialize(text)
579 578 oid = p.oid()
580 579 if not store.has(oid):
581 580 pointers.append(p)
582 581 if len(pointers) > 0:
583 582 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
584 583 assert all(store.has(p.oid()) for p in pointers)
585 584
586 585 def logstacktrace(self):
587 586 import traceback
588 587 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
589 588 ''.join(traceback.format_stack()))
General Comments 0
You need to be logged in to leave comments. Login now