##// END OF EJS Templates
remotefilelog: use progress helper in fileserverclient...
Martin von Zweigbergk -
r40881:e58cd7ed default
parent child Browse files
Show More
@@ -1,589 +1,589 b''
1 1 # fileserverclient.py - client for communicating with the cache process
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import io
12 12 import os
13 13 import threading
14 14 import time
15 15 import zlib
16 16
17 17 from mercurial.i18n import _
18 18 from mercurial.node import bin, hex, nullid
19 19 from mercurial import (
20 20 error,
21 21 node,
22 22 pycompat,
23 23 revlog,
24 24 sshpeer,
25 25 util,
26 26 wireprotov1peer,
27 27 )
28 28 from mercurial.utils import procutil
29 29
30 30 from . import (
31 31 constants,
32 32 contentstore,
33 33 metadatastore,
34 34 )
35 35
36 36 _sshv1peer = sshpeer.sshv1peer
37 37
38 38 # Statistics for debugging
39 39 fetchcost = 0
40 40 fetches = 0
41 41 fetched = 0
42 42 fetchmisses = 0
43 43
44 44 _lfsmod = None
45 _downloading = _('downloading')
46 45
47 46 def getcachekey(reponame, file, id):
48 47 pathhash = node.hex(hashlib.sha1(file).digest())
49 48 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
50 49
51 50 def getlocalkey(file, id):
52 51 pathhash = node.hex(hashlib.sha1(file).digest())
53 52 return os.path.join(pathhash, id)
54 53
55 54 def peersetup(ui, peer):
56 55
57 56 class remotefilepeer(peer.__class__):
58 57 @wireprotov1peer.batchable
59 58 def x_rfl_getfile(self, file, node):
60 59 if not self.capable('x_rfl_getfile'):
61 60 raise error.Abort(
62 61 'configured remotefile server does not support getfile')
63 62 f = wireprotov1peer.future()
64 63 yield {'file': file, 'node': node}, f
65 64 code, data = f.value.split('\0', 1)
66 65 if int(code):
67 66 raise error.LookupError(file, node, data)
68 67 yield data
69 68
70 69 @wireprotov1peer.batchable
71 70 def x_rfl_getflogheads(self, path):
72 71 if not self.capable('x_rfl_getflogheads'):
73 72 raise error.Abort('configured remotefile server does not '
74 73 'support getflogheads')
75 74 f = wireprotov1peer.future()
76 75 yield {'path': path}, f
77 76 heads = f.value.split('\n') if f.value else []
78 77 yield heads
79 78
80 79 def _updatecallstreamopts(self, command, opts):
81 80 if command != 'getbundle':
82 81 return
83 82 if (constants.NETWORK_CAP_LEGACY_SSH_GETFILES
84 83 not in self.capabilities()):
85 84 return
86 85 if not util.safehasattr(self, '_localrepo'):
87 86 return
88 87 if (constants.SHALLOWREPO_REQUIREMENT
89 88 not in self._localrepo.requirements):
90 89 return
91 90
92 91 bundlecaps = opts.get('bundlecaps')
93 92 if bundlecaps:
94 93 bundlecaps = [bundlecaps]
95 94 else:
96 95 bundlecaps = []
97 96
98 97 # shallow, includepattern, and excludepattern are a hacky way of
99 98 # carrying over data from the local repo to this getbundle
100 99 # command. We need to do it this way because bundle1 getbundle
101 100 # doesn't provide any other place we can hook in to manipulate
102 101 # getbundle args before it goes across the wire. Once we get rid
103 102 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
104 103 # do this more cleanly.
105 104 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
106 105 if self._localrepo.includepattern:
107 106 patterns = '\0'.join(self._localrepo.includepattern)
108 107 includecap = "includepattern=" + patterns
109 108 bundlecaps.append(includecap)
110 109 if self._localrepo.excludepattern:
111 110 patterns = '\0'.join(self._localrepo.excludepattern)
112 111 excludecap = "excludepattern=" + patterns
113 112 bundlecaps.append(excludecap)
114 113 opts['bundlecaps'] = ','.join(bundlecaps)
115 114
116 115 def _sendrequest(self, command, args, **opts):
117 116 self._updatecallstreamopts(command, args)
118 117 return super(remotefilepeer, self)._sendrequest(command, args,
119 118 **opts)
120 119
121 120 def _callstream(self, command, **opts):
122 121 supertype = super(remotefilepeer, self)
123 122 if not util.safehasattr(supertype, '_sendrequest'):
124 123 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
125 124 return super(remotefilepeer, self)._callstream(command, **opts)
126 125
127 126 peer.__class__ = remotefilepeer
128 127
129 128 class cacheconnection(object):
130 129 """The connection for communicating with the remote cache. Performs
131 130 gets and sets by communicating with an external process that has the
132 131 cache-specific implementation.
133 132 """
134 133 def __init__(self):
135 134 self.pipeo = self.pipei = self.pipee = None
136 135 self.subprocess = None
137 136 self.connected = False
138 137
139 138 def connect(self, cachecommand):
140 139 if self.pipeo:
141 140 raise error.Abort(_("cache connection already open"))
142 141 self.pipei, self.pipeo, self.pipee, self.subprocess = \
143 142 procutil.popen4(cachecommand)
144 143 self.connected = True
145 144
146 145 def close(self):
147 146 def tryclose(pipe):
148 147 try:
149 148 pipe.close()
150 149 except Exception:
151 150 pass
152 151 if self.connected:
153 152 try:
154 153 self.pipei.write("exit\n")
155 154 except Exception:
156 155 pass
157 156 tryclose(self.pipei)
158 157 self.pipei = None
159 158 tryclose(self.pipeo)
160 159 self.pipeo = None
161 160 tryclose(self.pipee)
162 161 self.pipee = None
163 162 try:
164 163 # Wait for process to terminate, making sure to avoid deadlock.
165 164 # See https://docs.python.org/2/library/subprocess.html for
166 165 # warnings about wait() and deadlocking.
167 166 self.subprocess.communicate()
168 167 except Exception:
169 168 pass
170 169 self.subprocess = None
171 170 self.connected = False
172 171
173 172 def request(self, request, flush=True):
174 173 if self.connected:
175 174 try:
176 175 self.pipei.write(request)
177 176 if flush:
178 177 self.pipei.flush()
179 178 except IOError:
180 179 self.close()
181 180
182 181 def receiveline(self):
183 182 if not self.connected:
184 183 return None
185 184 try:
186 185 result = self.pipeo.readline()[:-1]
187 186 if not result:
188 187 self.close()
189 188 except IOError:
190 189 self.close()
191 190
192 191 return result
193 192
194 193 def _getfilesbatch(
195 194 remote, receivemissing, progresstick, missed, idmap, batchsize):
196 195 # Over http(s), iterbatch is a streamy method and we can start
197 196 # looking at results early. This means we send one (potentially
198 197 # large) request, but then we show nice progress as we process
199 198 # file results, rather than showing chunks of $batchsize in
200 199 # progress.
201 200 #
202 201 # Over ssh, iterbatch isn't streamy because batch() wasn't
203 202 # explicitly designed as a streaming method. In the future we
204 203 # should probably introduce a streambatch() method upstream and
205 204 # use that for this.
206 205 with remote.commandexecutor() as e:
207 206 futures = []
208 207 for m in missed:
209 208 futures.append(e.callcommand('x_rfl_getfile', {
210 209 'file': idmap[m],
211 210 'node': m[-40:]
212 211 }))
213 212
214 213 for i, m in enumerate(missed):
215 214 r = futures[i].result()
216 215 futures[i] = None # release memory
217 216 file_ = idmap[m]
218 217 node = m[-40:]
219 218 receivemissing(io.BytesIO('%d\n%s' % (len(r), r)), file_, node)
220 219 progresstick()
221 220
222 221 def _getfiles_optimistic(
223 222 remote, receivemissing, progresstick, missed, idmap, step):
224 223 remote._callstream("x_rfl_getfiles")
225 224 i = 0
226 225 pipeo = remote._pipeo
227 226 pipei = remote._pipei
228 227 while i < len(missed):
229 228 # issue a batch of requests
230 229 start = i
231 230 end = min(len(missed), start + step)
232 231 i = end
233 232 for missingid in missed[start:end]:
234 233 # issue new request
235 234 versionid = missingid[-40:]
236 235 file = idmap[missingid]
237 236 sshrequest = "%s%s\n" % (versionid, file)
238 237 pipeo.write(sshrequest)
239 238 pipeo.flush()
240 239
241 240 # receive batch results
242 241 for missingid in missed[start:end]:
243 242 versionid = missingid[-40:]
244 243 file = idmap[missingid]
245 244 receivemissing(pipei, file, versionid)
246 245 progresstick()
247 246
248 247 # End the command
249 248 pipeo.write('\n')
250 249 pipeo.flush()
251 250
252 251 def _getfiles_threaded(
253 252 remote, receivemissing, progresstick, missed, idmap, step):
254 253 remote._callstream("getfiles")
255 254 pipeo = remote._pipeo
256 255 pipei = remote._pipei
257 256
258 257 def writer():
259 258 for missingid in missed:
260 259 versionid = missingid[-40:]
261 260 file = idmap[missingid]
262 261 sshrequest = "%s%s\n" % (versionid, file)
263 262 pipeo.write(sshrequest)
264 263 pipeo.flush()
265 264 writerthread = threading.Thread(target=writer)
266 265 writerthread.daemon = True
267 266 writerthread.start()
268 267
269 268 for missingid in missed:
270 269 versionid = missingid[-40:]
271 270 file = idmap[missingid]
272 271 receivemissing(pipei, file, versionid)
273 272 progresstick()
274 273
275 274 writerthread.join()
276 275 # End the command
277 276 pipeo.write('\n')
278 277 pipeo.flush()
279 278
280 279 class fileserverclient(object):
281 280 """A client for requesting files from the remote file server.
282 281 """
283 282 def __init__(self, repo):
284 283 ui = repo.ui
285 284 self.repo = repo
286 285 self.ui = ui
287 286 self.cacheprocess = ui.config("remotefilelog", "cacheprocess")
288 287 if self.cacheprocess:
289 288 self.cacheprocess = util.expandpath(self.cacheprocess)
290 289
291 290 # This option causes remotefilelog to pass the full file path to the
292 291 # cacheprocess instead of a hashed key.
293 292 self.cacheprocesspasspath = ui.configbool(
294 293 "remotefilelog", "cacheprocess.includepath")
295 294
296 295 self.debugoutput = ui.configbool("remotefilelog", "debug")
297 296
298 297 self.remotecache = cacheconnection()
299 298
300 299 def setstore(self, datastore, historystore, writedata, writehistory):
301 300 self.datastore = datastore
302 301 self.historystore = historystore
303 302 self.writedata = writedata
304 303 self.writehistory = writehistory
305 304
306 305 def _connect(self):
307 306 return self.repo.connectionpool.get(self.repo.fallbackpath)
308 307
309 308 def request(self, fileids):
310 309 """Takes a list of filename/node pairs and fetches them from the
311 310 server. Files are stored in the local cache.
312 311 A list of nodes that the server couldn't find is returned.
313 312 If the connection fails, an exception is raised.
314 313 """
315 314 if not self.remotecache.connected:
316 315 self.connect()
317 316 cache = self.remotecache
318 317 writedata = self.writedata
319 318
320 319 repo = self.repo
321 320 count = len(fileids)
322 321 request = "get\n%d\n" % count
323 322 idmap = {}
324 323 reponame = repo.name
325 324 for file, id in fileids:
326 325 fullid = getcachekey(reponame, file, id)
327 326 if self.cacheprocesspasspath:
328 327 request += file + '\0'
329 328 request += fullid + "\n"
330 329 idmap[fullid] = file
331 330
332 331 cache.request(request)
333 332
334 333 total = count
335 self.ui.progress(_downloading, 0, total=count)
334 progress = self.ui.makeprogress(_('downloading'), total=count)
335 progress.update(0)
336 336
337 337 missed = []
338 338 count = 0
339 339 while True:
340 340 missingid = cache.receiveline()
341 341 if not missingid:
342 342 missedset = set(missed)
343 343 for missingid in idmap:
344 344 if not missingid in missedset:
345 345 missed.append(missingid)
346 346 self.ui.warn(_("warning: cache connection closed early - " +
347 347 "falling back to server\n"))
348 348 break
349 349 if missingid == "0":
350 350 break
351 351 if missingid.startswith("_hits_"):
352 352 # receive progress reports
353 353 parts = missingid.split("_")
354 354 count += int(parts[2])
355 self.ui.progress(_downloading, count, total=total)
355 progress.update(count)
356 356 continue
357 357
358 358 missed.append(missingid)
359 359
360 360 global fetchmisses
361 361 fetchmisses += len(missed)
362 362
363 363 count = [total - len(missed)]
364 364 fromcache = count[0]
365 self.ui.progress(_downloading, count[0], total=total)
365 progress.update(count[0], total=total)
366 366 self.ui.log("remotefilelog", "remote cache hit rate is %r of %r\n",
367 367 count[0], total, hit=count[0], total=total)
368 368
369 369 oldumask = os.umask(0o002)
370 370 try:
371 371 # receive cache misses from master
372 372 if missed:
373 373 def progresstick():
374 374 count[0] += 1
375 self.ui.progress(_downloading, count[0], total=total)
375 progress.update(count[0])
376 376 # When verbose is true, sshpeer prints 'running ssh...'
377 377 # to stdout, which can interfere with some command
378 378 # outputs
379 379 verbose = self.ui.verbose
380 380 self.ui.verbose = False
381 381 try:
382 382 with self._connect() as conn:
383 383 remote = conn.peer
384 384 if remote.capable(
385 385 constants.NETWORK_CAP_LEGACY_SSH_GETFILES):
386 386 if not isinstance(remote, _sshv1peer):
387 387 raise error.Abort('remotefilelog requires ssh '
388 388 'servers')
389 389 step = self.ui.configint('remotefilelog',
390 390 'getfilesstep')
391 391 getfilestype = self.ui.config('remotefilelog',
392 392 'getfilestype')
393 393 if getfilestype == 'threaded':
394 394 _getfiles = _getfiles_threaded
395 395 else:
396 396 _getfiles = _getfiles_optimistic
397 397 _getfiles(remote, self.receivemissing, progresstick,
398 398 missed, idmap, step)
399 399 elif remote.capable("x_rfl_getfile"):
400 400 if remote.capable('batch'):
401 401 batchdefault = 100
402 402 else:
403 403 batchdefault = 10
404 404 batchsize = self.ui.configint(
405 405 'remotefilelog', 'batchsize', batchdefault)
406 406 _getfilesbatch(
407 407 remote, self.receivemissing, progresstick,
408 408 missed, idmap, batchsize)
409 409 else:
410 410 raise error.Abort("configured remotefilelog server"
411 411 " does not support remotefilelog")
412 412
413 413 self.ui.log("remotefilefetchlog",
414 414 "Success\n",
415 415 fetched_files = count[0] - fromcache,
416 416 total_to_fetch = total - fromcache)
417 417 except Exception:
418 418 self.ui.log("remotefilefetchlog",
419 419 "Fail\n",
420 420 fetched_files = count[0] - fromcache,
421 421 total_to_fetch = total - fromcache)
422 422 raise
423 423 finally:
424 424 self.ui.verbose = verbose
425 425 # send to memcache
426 426 count[0] = len(missed)
427 427 request = "set\n%d\n%s\n" % (count[0], "\n".join(missed))
428 428 cache.request(request)
429 429
430 self.ui.progress(_downloading, None)
430 progress.complete()
431 431
432 432 # mark ourselves as a user of this cache
433 433 writedata.markrepo(self.repo.path)
434 434 finally:
435 435 os.umask(oldumask)
436 436
437 437 def receivemissing(self, pipe, filename, node):
438 438 line = pipe.readline()[:-1]
439 439 if not line:
440 440 raise error.ResponseError(_("error downloading file contents:"),
441 441 _("connection closed early"))
442 442 size = int(line)
443 443 data = pipe.read(size)
444 444 if len(data) != size:
445 445 raise error.ResponseError(_("error downloading file contents:"),
446 446 _("only received %s of %s bytes")
447 447 % (len(data), size))
448 448
449 449 self.writedata.addremotefilelognode(filename, bin(node),
450 450 zlib.decompress(data))
451 451
452 452 def connect(self):
453 453 if self.cacheprocess:
454 454 cmd = "%s %s" % (self.cacheprocess, self.writedata._path)
455 455 self.remotecache.connect(cmd)
456 456 else:
457 457 # If no cache process is specified, we fake one that always
458 458 # returns cache misses. This enables tests to run easily
459 459 # and may eventually allow us to be a drop in replacement
460 460 # for the largefiles extension.
461 461 class simplecache(object):
462 462 def __init__(self):
463 463 self.missingids = []
464 464 self.connected = True
465 465
466 466 def close(self):
467 467 pass
468 468
469 469 def request(self, value, flush=True):
470 470 lines = value.split("\n")
471 471 if lines[0] != "get":
472 472 return
473 473 self.missingids = lines[2:-1]
474 474 self.missingids.append('0')
475 475
476 476 def receiveline(self):
477 477 if len(self.missingids) > 0:
478 478 return self.missingids.pop(0)
479 479 return None
480 480
481 481 self.remotecache = simplecache()
482 482
483 483 def close(self):
484 484 if fetches:
485 485 msg = ("%d files fetched over %d fetches - " +
486 486 "(%d misses, %0.2f%% hit ratio) over %0.2fs\n") % (
487 487 fetched,
488 488 fetches,
489 489 fetchmisses,
490 490 float(fetched - fetchmisses) / float(fetched) * 100.0,
491 491 fetchcost)
492 492 if self.debugoutput:
493 493 self.ui.warn(msg)
494 494 self.ui.log("remotefilelog.prefetch", msg.replace("%", "%%"),
495 495 remotefilelogfetched=fetched,
496 496 remotefilelogfetches=fetches,
497 497 remotefilelogfetchmisses=fetchmisses,
498 498 remotefilelogfetchtime=fetchcost * 1000)
499 499
500 500 if self.remotecache.connected:
501 501 self.remotecache.close()
502 502
503 503 def prefetch(self, fileids, force=False, fetchdata=True,
504 504 fetchhistory=False):
505 505 """downloads the given file versions to the cache
506 506 """
507 507 repo = self.repo
508 508 idstocheck = []
509 509 for file, id in fileids:
510 510 # hack
511 511 # - we don't use .hgtags
512 512 # - workingctx produces ids with length 42,
513 513 # which we skip since they aren't in any cache
514 514 if (file == '.hgtags' or len(id) == 42
515 515 or not repo.shallowmatch(file)):
516 516 continue
517 517
518 518 idstocheck.append((file, bin(id)))
519 519
520 520 datastore = self.datastore
521 521 historystore = self.historystore
522 522 if force:
523 523 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
524 524 historystore = metadatastore.unionmetadatastore(
525 525 *repo.sharedhistorystores)
526 526
527 527 missingids = set()
528 528 if fetchdata:
529 529 missingids.update(datastore.getmissing(idstocheck))
530 530 if fetchhistory:
531 531 missingids.update(historystore.getmissing(idstocheck))
532 532
533 533 # partition missing nodes into nullid and not-nullid so we can
534 534 # warn about this filtering potentially shadowing bugs.
535 535 nullids = len([None for unused, id in missingids if id == nullid])
536 536 if nullids:
537 537 missingids = [(f, id) for f, id in missingids if id != nullid]
538 538 repo.ui.develwarn(
539 539 ('remotefilelog not fetching %d null revs'
540 540 ' - this is likely hiding bugs' % nullids),
541 541 config='remotefilelog-ext')
542 542 if missingids:
543 543 global fetches, fetched, fetchcost
544 544 fetches += 1
545 545
546 546 # We want to be able to detect excess individual file downloads, so
547 547 # let's log that information for debugging.
548 548 if fetches >= 15 and fetches < 18:
549 549 if fetches == 15:
550 550 fetchwarning = self.ui.config('remotefilelog',
551 551 'fetchwarning')
552 552 if fetchwarning:
553 553 self.ui.warn(fetchwarning + '\n')
554 554 self.logstacktrace()
555 555 missingids = [(file, hex(id)) for file, id in missingids]
556 556 fetched += len(missingids)
557 557 start = time.time()
558 558 missingids = self.request(missingids)
559 559 if missingids:
560 560 raise error.Abort(_("unable to download %d files") %
561 561 len(missingids))
562 562 fetchcost += time.time() - start
563 563 self._lfsprefetch(fileids)
564 564
565 565 def _lfsprefetch(self, fileids):
566 566 if not _lfsmod or not util.safehasattr(
567 567 self.repo.svfs, 'lfslocalblobstore'):
568 568 return
569 569 if not _lfsmod.wrapper.candownload(self.repo):
570 570 return
571 571 pointers = []
572 572 store = self.repo.svfs.lfslocalblobstore
573 573 for file, id in fileids:
574 574 node = bin(id)
575 575 rlog = self.repo.file(file)
576 576 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
577 577 text = rlog.revision(node, raw=True)
578 578 p = _lfsmod.pointer.deserialize(text)
579 579 oid = p.oid()
580 580 if not store.has(oid):
581 581 pointers.append(p)
582 582 if len(pointers) > 0:
583 583 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
584 584 assert all(store.has(p.oid()) for p in pointers)
585 585
586 586 def logstacktrace(self):
587 587 import traceback
588 588 self.ui.log('remotefilelog', 'excess remotefilelog fetching:\n%s\n',
589 589 ''.join(traceback.format_stack()))
General Comments 0
You need to be logged in to leave comments. Login now