##// END OF EJS Templates
watchman: add the possibility to set the exact watchman binary location...
Boris Feld -
r42134:57264906 default
parent child Browse files
Show More
@@ -1,828 +1,831 b''
1 1 # __init__.py - fsmonitor initialization and overrides
2 2 #
3 3 # Copyright 2013-2016 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 '''Faster status operations with the Watchman file monitor (EXPERIMENTAL)
9 9
10 10 Integrates the file-watching program Watchman with Mercurial to produce faster
11 11 status results.
12 12
13 13 On a particular Linux system, for a real-world repository with over 400,000
14 14 files hosted on ext4, vanilla `hg status` takes 1.3 seconds. On the same
15 15 system, with fsmonitor it takes about 0.3 seconds.
16 16
17 17 fsmonitor requires no configuration -- it will tell Watchman about your
18 18 repository as necessary. You'll need to install Watchman from
19 19 https://facebook.github.io/watchman/ and make sure it is in your PATH.
20 20
21 21 fsmonitor is incompatible with the largefiles and eol extensions, and
22 22 will disable itself if any of those are active.
23 23
24 24 The following configuration options exist:
25 25
26 26 ::
27 27
28 28 [fsmonitor]
29 29 mode = {off, on, paranoid}
30 30
31 31 When `mode = off`, fsmonitor will disable itself (similar to not loading the
32 32 extension at all). When `mode = on`, fsmonitor will be enabled (the default).
33 33 When `mode = paranoid`, fsmonitor will query both Watchman and the filesystem,
34 34 and ensure that the results are consistent.
35 35
36 36 ::
37 37
38 38 [fsmonitor]
39 39 timeout = (float)
40 40
41 41 A value, in seconds, that determines how long fsmonitor will wait for Watchman
42 42 to return results. Defaults to `2.0`.
43 43
44 44 ::
45 45
46 46 [fsmonitor]
47 47 blacklistusers = (list of userids)
48 48
49 49 A list of usernames for which fsmonitor will disable itself altogether.
50 50
51 51 ::
52 52
53 53 [fsmonitor]
54 54 walk_on_invalidate = (boolean)
55 55
56 56 Whether or not to walk the whole repo ourselves when our cached state has been
57 57 invalidated, for example when Watchman has been restarted or .hgignore rules
58 58 have been changed. Walking the repo in that case can result in competing for
59 59 I/O with Watchman. For large repos it is recommended to set this value to
60 60 false. You may wish to set this to true if you have a very fast filesystem
61 61 that can outpace the IPC overhead of getting the result data for the full repo
62 62 from Watchman. Defaults to false.
63 63
64 64 ::
65 65
66 66 [fsmonitor]
67 67 warn_when_unused = (boolean)
68 68
69 69 Whether to print a warning during certain operations when fsmonitor would be
70 70 beneficial to performance but isn't enabled.
71 71
72 72 ::
73 73
74 74 [fsmonitor]
75 75 warn_update_file_count = (integer)
76 76
77 77 If ``warn_when_unused`` is set and fsmonitor isn't enabled, a warning will
78 78 be printed during working directory updates if this many files will be
79 79 created.
80 80 '''
81 81
82 82 # Platforms Supported
83 83 # ===================
84 84 #
85 85 # **Linux:** *Stable*. Watchman and fsmonitor are both known to work reliably,
86 86 # even under severe loads.
87 87 #
88 88 # **Mac OS X:** *Stable*. The Mercurial test suite passes with fsmonitor
89 89 # turned on, on case-insensitive HFS+. There has been a reasonable amount of
90 90 # user testing under normal loads.
91 91 #
92 92 # **Solaris, BSD:** *Alpha*. watchman and fsmonitor are believed to work, but
93 93 # very little testing has been done.
94 94 #
95 95 # **Windows:** *Alpha*. Not in a release version of watchman or fsmonitor yet.
96 96 #
97 97 # Known Issues
98 98 # ============
99 99 #
100 100 # * fsmonitor will disable itself if any of the following extensions are
101 101 # enabled: largefiles, inotify, eol; or if the repository has subrepos.
102 102 # * fsmonitor will produce incorrect results if nested repos that are not
103 103 # subrepos exist. *Workaround*: add nested repo paths to your `.hgignore`.
104 104 #
105 105 # The issues related to nested repos and subrepos are probably not fundamental
106 106 # ones. Patches to fix them are welcome.
107 107
108 108 from __future__ import absolute_import
109 109
110 110 import codecs
111 111 import hashlib
112 112 import os
113 113 import stat
114 114 import sys
115 115 import weakref
116 116
117 117 from mercurial.i18n import _
118 118 from mercurial.node import (
119 119 hex,
120 120 )
121 121
122 122 from mercurial import (
123 123 context,
124 124 encoding,
125 125 error,
126 126 extensions,
127 127 localrepo,
128 128 merge,
129 129 pathutil,
130 130 pycompat,
131 131 registrar,
132 132 scmutil,
133 133 util,
134 134 )
135 135 from mercurial import match as matchmod
136 136
137 137 from . import (
138 138 pywatchman,
139 139 state,
140 140 watchmanclient,
141 141 )
142 142
143 143 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
144 144 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
145 145 # be specifying the version(s) of Mercurial they are tested with, or
146 146 # leave the attribute unspecified.
147 147 testedwith = 'ships-with-hg-core'
148 148
149 149 configtable = {}
150 150 configitem = registrar.configitem(configtable)
151 151
152 152 configitem('fsmonitor', 'mode',
153 153 default='on',
154 154 )
155 155 configitem('fsmonitor', 'walk_on_invalidate',
156 156 default=False,
157 157 )
158 158 configitem('fsmonitor', 'timeout',
159 159 default='2',
160 160 )
161 161 configitem('fsmonitor', 'blacklistusers',
162 162 default=list,
163 163 )
164 configitem('fsmonitor', 'watchman_exe',
165 default='watchman',
166 )
164 167 configitem('fsmonitor', 'verbose',
165 168 default=True,
166 169 )
167 170 configitem('experimental', 'fsmonitor.transaction_notify',
168 171 default=False,
169 172 )
170 173
171 174 # This extension is incompatible with the following blacklisted extensions
172 175 # and will disable itself when encountering one of these:
173 176 _blacklist = ['largefiles', 'eol']
174 177
175 178 def _handleunavailable(ui, state, ex):
176 179 """Exception handler for Watchman interaction exceptions"""
177 180 if isinstance(ex, watchmanclient.Unavailable):
178 181 # experimental config: fsmonitor.verbose
179 182 if ex.warn and ui.configbool('fsmonitor', 'verbose'):
180 183 ui.warn(str(ex) + '\n')
181 184 if ex.invalidate:
182 185 state.invalidate()
183 186 # experimental config: fsmonitor.verbose
184 187 if ui.configbool('fsmonitor', 'verbose'):
185 188 ui.log('fsmonitor', 'Watchman unavailable: %s\n', ex.msg)
186 189 else:
187 190 ui.log('fsmonitor', 'Watchman exception: %s\n', ex)
188 191
189 192 def _hashignore(ignore):
190 193 """Calculate hash for ignore patterns and filenames
191 194
192 195 If this information changes between Mercurial invocations, we can't
193 196 rely on Watchman information anymore and have to re-scan the working
194 197 copy.
195 198
196 199 """
197 200 sha1 = hashlib.sha1()
198 201 sha1.update(repr(ignore))
199 202 return sha1.hexdigest()
200 203
201 204 _watchmanencoding = pywatchman.encoding.get_local_encoding()
202 205 _fsencoding = sys.getfilesystemencoding() or sys.getdefaultencoding()
203 206 _fixencoding = codecs.lookup(_watchmanencoding) != codecs.lookup(_fsencoding)
204 207
205 208 def _watchmantofsencoding(path):
206 209 """Fix path to match watchman and local filesystem encoding
207 210
208 211 watchman's paths encoding can differ from filesystem encoding. For example,
209 212 on Windows, it's always utf-8.
210 213 """
211 214 try:
212 215 decoded = path.decode(_watchmanencoding)
213 216 except UnicodeDecodeError as e:
214 217 raise error.Abort(str(e), hint='watchman encoding error')
215 218
216 219 try:
217 220 encoded = decoded.encode(_fsencoding, 'strict')
218 221 except UnicodeEncodeError as e:
219 222 raise error.Abort(str(e))
220 223
221 224 return encoded
222 225
223 226 def overridewalk(orig, self, match, subrepos, unknown, ignored, full=True):
224 227 '''Replacement for dirstate.walk, hooking into Watchman.
225 228
226 229 Whenever full is False, ignored is False, and the Watchman client is
227 230 available, use Watchman combined with saved state to possibly return only a
228 231 subset of files.'''
229 232 def bail(reason):
230 233 self._ui.debug('fsmonitor: fallback to core status, %s\n' % reason)
231 234 return orig(match, subrepos, unknown, ignored, full=True)
232 235
233 236 if full:
234 237 return bail('full rewalk requested')
235 238 if ignored:
236 239 return bail('listing ignored files')
237 240 if not self._watchmanclient.available():
238 241 return bail('client unavailable')
239 242 state = self._fsmonitorstate
240 243 clock, ignorehash, notefiles = state.get()
241 244 if not clock:
242 245 if state.walk_on_invalidate:
243 246 return bail('no clock')
244 247 # Initial NULL clock value, see
245 248 # https://facebook.github.io/watchman/docs/clockspec.html
246 249 clock = 'c:0:0'
247 250 notefiles = []
248 251
249 252 ignore = self._ignore
250 253 dirignore = self._dirignore
251 254 if unknown:
252 255 if _hashignore(ignore) != ignorehash and clock != 'c:0:0':
253 256 # ignore list changed -- can't rely on Watchman state any more
254 257 if state.walk_on_invalidate:
255 258 return bail('ignore rules changed')
256 259 notefiles = []
257 260 clock = 'c:0:0'
258 261 else:
259 262 # always ignore
260 263 ignore = util.always
261 264 dirignore = util.always
262 265
263 266 matchfn = match.matchfn
264 267 matchalways = match.always()
265 268 dmap = self._map
266 269 if util.safehasattr(dmap, '_map'):
267 270 # for better performance, directly access the inner dirstate map if the
268 271 # standard dirstate implementation is in use.
269 272 dmap = dmap._map
270 273 nonnormalset = self._map.nonnormalset
271 274
272 275 copymap = self._map.copymap
273 276 getkind = stat.S_IFMT
274 277 dirkind = stat.S_IFDIR
275 278 regkind = stat.S_IFREG
276 279 lnkkind = stat.S_IFLNK
277 280 join = self._join
278 281 normcase = util.normcase
279 282 fresh_instance = False
280 283
281 284 exact = skipstep3 = False
282 285 if match.isexact(): # match.exact
283 286 exact = True
284 287 dirignore = util.always # skip step 2
285 288 elif match.prefix(): # match.match, no patterns
286 289 skipstep3 = True
287 290
288 291 if not exact and self._checkcase:
289 292 # note that even though we could receive directory entries, we're only
290 293 # interested in checking if a file with the same name exists. So only
291 294 # normalize files if possible.
292 295 normalize = self._normalizefile
293 296 skipstep3 = False
294 297 else:
295 298 normalize = None
296 299
297 300 # step 1: find all explicit files
298 301 results, work, dirsnotfound = self._walkexplicit(match, subrepos)
299 302
300 303 skipstep3 = skipstep3 and not (work or dirsnotfound)
301 304 work = [d for d in work if not dirignore(d[0])]
302 305
303 306 if not work and (exact or skipstep3):
304 307 for s in subrepos:
305 308 del results[s]
306 309 del results['.hg']
307 310 return results
308 311
309 312 # step 2: query Watchman
310 313 try:
311 314 # Use the user-configured timeout for the query.
312 315 # Add a little slack over the top of the user query to allow for
313 316 # overheads while transferring the data
314 317 self._watchmanclient.settimeout(state.timeout + 0.1)
315 318 result = self._watchmanclient.command('query', {
316 319 'fields': ['mode', 'mtime', 'size', 'exists', 'name'],
317 320 'since': clock,
318 321 'expression': [
319 322 'not', [
320 323 'anyof', ['dirname', '.hg'],
321 324 ['name', '.hg', 'wholename']
322 325 ]
323 326 ],
324 327 'sync_timeout': int(state.timeout * 1000),
325 328 'empty_on_fresh_instance': state.walk_on_invalidate,
326 329 })
327 330 except Exception as ex:
328 331 _handleunavailable(self._ui, state, ex)
329 332 self._watchmanclient.clearconnection()
330 333 return bail('exception during run')
331 334 else:
332 335 # We need to propagate the last observed clock up so that we
333 336 # can use it for our next query
334 337 state.setlastclock(result['clock'])
335 338 if result['is_fresh_instance']:
336 339 if state.walk_on_invalidate:
337 340 state.invalidate()
338 341 return bail('fresh instance')
339 342 fresh_instance = True
340 343 # Ignore any prior noteable files from the state info
341 344 notefiles = []
342 345
343 346 # for file paths which require normalization and we encounter a case
344 347 # collision, we store our own foldmap
345 348 if normalize:
346 349 foldmap = dict((normcase(k), k) for k in results)
347 350
348 351 switch_slashes = pycompat.ossep == '\\'
349 352 # The order of the results is, strictly speaking, undefined.
350 353 # For case changes on a case insensitive filesystem we may receive
351 354 # two entries, one with exists=True and another with exists=False.
352 355 # The exists=True entries in the same response should be interpreted
353 356 # as being happens-after the exists=False entries due to the way that
354 357 # Watchman tracks files. We use this property to reconcile deletes
355 358 # for name case changes.
356 359 for entry in result['files']:
357 360 fname = entry['name']
358 361 if _fixencoding:
359 362 fname = _watchmantofsencoding(fname)
360 363 if switch_slashes:
361 364 fname = fname.replace('\\', '/')
362 365 if normalize:
363 366 normed = normcase(fname)
364 367 fname = normalize(fname, True, True)
365 368 foldmap[normed] = fname
366 369 fmode = entry['mode']
367 370 fexists = entry['exists']
368 371 kind = getkind(fmode)
369 372
370 373 if '/.hg/' in fname or fname.endswith('/.hg'):
371 374 return bail('nested-repo-detected')
372 375
373 376 if not fexists:
374 377 # if marked as deleted and we don't already have a change
375 378 # record, mark it as deleted. If we already have an entry
376 379 # for fname then it was either part of walkexplicit or was
377 380 # an earlier result that was a case change
378 381 if fname not in results and fname in dmap and (
379 382 matchalways or matchfn(fname)):
380 383 results[fname] = None
381 384 elif kind == dirkind:
382 385 if fname in dmap and (matchalways or matchfn(fname)):
383 386 results[fname] = None
384 387 elif kind == regkind or kind == lnkkind:
385 388 if fname in dmap:
386 389 if matchalways or matchfn(fname):
387 390 results[fname] = entry
388 391 elif (matchalways or matchfn(fname)) and not ignore(fname):
389 392 results[fname] = entry
390 393 elif fname in dmap and (matchalways or matchfn(fname)):
391 394 results[fname] = None
392 395
393 396 # step 3: query notable files we don't already know about
394 397 # XXX try not to iterate over the entire dmap
395 398 if normalize:
396 399 # any notable files that have changed case will already be handled
397 400 # above, so just check membership in the foldmap
398 401 notefiles = set((normalize(f, True, True) for f in notefiles
399 402 if normcase(f) not in foldmap))
400 403 visit = set((f for f in notefiles if (f not in results and matchfn(f)
401 404 and (f in dmap or not ignore(f)))))
402 405
403 406 if not fresh_instance:
404 407 if matchalways:
405 408 visit.update(f for f in nonnormalset if f not in results)
406 409 visit.update(f for f in copymap if f not in results)
407 410 else:
408 411 visit.update(f for f in nonnormalset
409 412 if f not in results and matchfn(f))
410 413 visit.update(f for f in copymap
411 414 if f not in results and matchfn(f))
412 415 else:
413 416 if matchalways:
414 417 visit.update(f for f, st in dmap.iteritems() if f not in results)
415 418 visit.update(f for f in copymap if f not in results)
416 419 else:
417 420 visit.update(f for f, st in dmap.iteritems()
418 421 if f not in results and matchfn(f))
419 422 visit.update(f for f in copymap
420 423 if f not in results and matchfn(f))
421 424
422 425 audit = pathutil.pathauditor(self._root, cached=True).check
423 426 auditpass = [f for f in visit if audit(f)]
424 427 auditpass.sort()
425 428 auditfail = visit.difference(auditpass)
426 429 for f in auditfail:
427 430 results[f] = None
428 431
429 432 nf = iter(auditpass).next
430 433 for st in util.statfiles([join(f) for f in auditpass]):
431 434 f = nf()
432 435 if st or f in dmap:
433 436 results[f] = st
434 437
435 438 for s in subrepos:
436 439 del results[s]
437 440 del results['.hg']
438 441 return results
439 442
440 443 def overridestatus(
441 444 orig, self, node1='.', node2=None, match=None, ignored=False,
442 445 clean=False, unknown=False, listsubrepos=False):
443 446 listignored = ignored
444 447 listclean = clean
445 448 listunknown = unknown
446 449
447 450 def _cmpsets(l1, l2):
448 451 try:
449 452 if 'FSMONITOR_LOG_FILE' in encoding.environ:
450 453 fn = encoding.environ['FSMONITOR_LOG_FILE']
451 454 f = open(fn, 'wb')
452 455 else:
453 456 fn = 'fsmonitorfail.log'
454 457 f = self.vfs.open(fn, 'wb')
455 458 except (IOError, OSError):
456 459 self.ui.warn(_('warning: unable to write to %s\n') % fn)
457 460 return
458 461
459 462 try:
460 463 for i, (s1, s2) in enumerate(zip(l1, l2)):
461 464 if set(s1) != set(s2):
462 465 f.write('sets at position %d are unequal\n' % i)
463 466 f.write('watchman returned: %s\n' % s1)
464 467 f.write('stat returned: %s\n' % s2)
465 468 finally:
466 469 f.close()
467 470
468 471 if isinstance(node1, context.changectx):
469 472 ctx1 = node1
470 473 else:
471 474 ctx1 = self[node1]
472 475 if isinstance(node2, context.changectx):
473 476 ctx2 = node2
474 477 else:
475 478 ctx2 = self[node2]
476 479
477 480 working = ctx2.rev() is None
478 481 parentworking = working and ctx1 == self['.']
479 482 match = match or matchmod.always()
480 483
481 484 # Maybe we can use this opportunity to update Watchman's state.
482 485 # Mercurial uses workingcommitctx and/or memctx to represent the part of
483 486 # the workingctx that is to be committed. So don't update the state in
484 487 # that case.
485 488 # HG_PENDING is set in the environment when the dirstate is being updated
486 489 # in the middle of a transaction; we must not update our state in that
487 490 # case, or we risk forgetting about changes in the working copy.
488 491 updatestate = (parentworking and match.always() and
489 492 not isinstance(ctx2, (context.workingcommitctx,
490 493 context.memctx)) and
491 494 'HG_PENDING' not in encoding.environ)
492 495
493 496 try:
494 497 if self._fsmonitorstate.walk_on_invalidate:
495 498 # Use a short timeout to query the current clock. If that
496 499 # takes too long then we assume that the service will be slow
497 500 # to answer our query.
498 501 # walk_on_invalidate indicates that we prefer to walk the
499 502 # tree ourselves because we can ignore portions that Watchman
500 503 # cannot and we tend to be faster in the warmer buffer cache
501 504 # cases.
502 505 self._watchmanclient.settimeout(0.1)
503 506 else:
504 507 # Give Watchman more time to potentially complete its walk
505 508 # and return the initial clock. In this mode we assume that
506 509 # the filesystem will be slower than parsing a potentially
507 510 # very large Watchman result set.
508 511 self._watchmanclient.settimeout(
509 512 self._fsmonitorstate.timeout + 0.1)
510 513 startclock = self._watchmanclient.getcurrentclock()
511 514 except Exception as ex:
512 515 self._watchmanclient.clearconnection()
513 516 _handleunavailable(self.ui, self._fsmonitorstate, ex)
514 517 # boo, Watchman failed. bail
515 518 return orig(node1, node2, match, listignored, listclean,
516 519 listunknown, listsubrepos)
517 520
518 521 if updatestate:
519 522 # We need info about unknown files. This may make things slower the
520 523 # first time, but whatever.
521 524 stateunknown = True
522 525 else:
523 526 stateunknown = listunknown
524 527
525 528 if updatestate:
526 529 ps = poststatus(startclock)
527 530 self.addpostdsstatus(ps)
528 531
529 532 r = orig(node1, node2, match, listignored, listclean, stateunknown,
530 533 listsubrepos)
531 534 modified, added, removed, deleted, unknown, ignored, clean = r
532 535
533 536 if not listunknown:
534 537 unknown = []
535 538
536 539 # don't do paranoid checks if we're not going to query Watchman anyway
537 540 full = listclean or match.traversedir is not None
538 541 if self._fsmonitorstate.mode == 'paranoid' and not full:
539 542 # run status again and fall back to the old walk this time
540 543 self.dirstate._fsmonitordisable = True
541 544
542 545 # shut the UI up
543 546 quiet = self.ui.quiet
544 547 self.ui.quiet = True
545 548 fout, ferr = self.ui.fout, self.ui.ferr
546 549 self.ui.fout = self.ui.ferr = open(os.devnull, 'wb')
547 550
548 551 try:
549 552 rv2 = orig(
550 553 node1, node2, match, listignored, listclean, listunknown,
551 554 listsubrepos)
552 555 finally:
553 556 self.dirstate._fsmonitordisable = False
554 557 self.ui.quiet = quiet
555 558 self.ui.fout, self.ui.ferr = fout, ferr
556 559
557 560 # clean isn't tested since it's set to True above
558 561 with self.wlock():
559 562 _cmpsets(
560 563 [modified, added, removed, deleted, unknown, ignored, clean],
561 564 rv2)
562 565 modified, added, removed, deleted, unknown, ignored, clean = rv2
563 566
564 567 return scmutil.status(
565 568 modified, added, removed, deleted, unknown, ignored, clean)
566 569
567 570 class poststatus(object):
568 571 def __init__(self, startclock):
569 572 self._startclock = startclock
570 573
571 574 def __call__(self, wctx, status):
572 575 clock = wctx.repo()._fsmonitorstate.getlastclock() or self._startclock
573 576 hashignore = _hashignore(wctx.repo().dirstate._ignore)
574 577 notefiles = (status.modified + status.added + status.removed +
575 578 status.deleted + status.unknown)
576 579 wctx.repo()._fsmonitorstate.set(clock, hashignore, notefiles)
577 580
578 581 def makedirstate(repo, dirstate):
579 582 class fsmonitordirstate(dirstate.__class__):
580 583 def _fsmonitorinit(self, repo):
581 584 # _fsmonitordisable is used in paranoid mode
582 585 self._fsmonitordisable = False
583 586 self._fsmonitorstate = repo._fsmonitorstate
584 587 self._watchmanclient = repo._watchmanclient
585 588 self._repo = weakref.proxy(repo)
586 589
587 590 def walk(self, *args, **kwargs):
588 591 orig = super(fsmonitordirstate, self).walk
589 592 if self._fsmonitordisable:
590 593 return orig(*args, **kwargs)
591 594 return overridewalk(orig, self, *args, **kwargs)
592 595
593 596 def rebuild(self, *args, **kwargs):
594 597 self._fsmonitorstate.invalidate()
595 598 return super(fsmonitordirstate, self).rebuild(*args, **kwargs)
596 599
597 600 def invalidate(self, *args, **kwargs):
598 601 self._fsmonitorstate.invalidate()
599 602 return super(fsmonitordirstate, self).invalidate(*args, **kwargs)
600 603
601 604 dirstate.__class__ = fsmonitordirstate
602 605 dirstate._fsmonitorinit(repo)
603 606
604 607 def wrapdirstate(orig, self):
605 608 ds = orig(self)
606 609 # only override the dirstate when Watchman is available for the repo
607 610 if util.safehasattr(self, '_fsmonitorstate'):
608 611 makedirstate(self, ds)
609 612 return ds
610 613
611 614 def extsetup(ui):
612 615 extensions.wrapfilecache(
613 616 localrepo.localrepository, 'dirstate', wrapdirstate)
614 617 if pycompat.isdarwin:
615 618 # An assist for avoiding the dangling-symlink fsevents bug
616 619 extensions.wrapfunction(os, 'symlink', wrapsymlink)
617 620
618 621 extensions.wrapfunction(merge, 'update', wrapupdate)
619 622
620 623 def wrapsymlink(orig, source, link_name):
621 624 ''' if we create a dangling symlink, also touch the parent dir
622 625 to encourage fsevents notifications to work more correctly '''
623 626 try:
624 627 return orig(source, link_name)
625 628 finally:
626 629 try:
627 630 os.utime(os.path.dirname(link_name), None)
628 631 except OSError:
629 632 pass
630 633
631 634 class state_update(object):
632 635 ''' This context manager is responsible for dispatching the state-enter
633 636 and state-leave signals to the watchman service. The enter and leave
634 637 methods can be invoked manually (for scenarios where context manager
635 638 semantics are not possible). If parameters oldnode and newnode are None,
636 639 they will be populated based on current working copy in enter and
637 640 leave, respectively. Similarly, if the distance is none, it will be
638 641 calculated based on the oldnode and newnode in the leave method.'''
639 642
640 643 def __init__(self, repo, name, oldnode=None, newnode=None, distance=None,
641 644 partial=False):
642 645 self.repo = repo.unfiltered()
643 646 self.name = name
644 647 self.oldnode = oldnode
645 648 self.newnode = newnode
646 649 self.distance = distance
647 650 self.partial = partial
648 651 self._lock = None
649 652 self.need_leave = False
650 653
651 654 def __enter__(self):
652 655 self.enter()
653 656
654 657 def enter(self):
655 658 # Make sure we have a wlock prior to sending notifications to watchman.
656 659 # We don't want to race with other actors. In the update case,
657 660 # merge.update is going to take the wlock almost immediately. We are
658 661 # effectively extending the lock around several short sanity checks.
659 662 if self.oldnode is None:
660 663 self.oldnode = self.repo['.'].node()
661 664
662 665 if self.repo.currentwlock() is None:
663 666 if util.safehasattr(self.repo, 'wlocknostateupdate'):
664 667 self._lock = self.repo.wlocknostateupdate()
665 668 else:
666 669 self._lock = self.repo.wlock()
667 670 self.need_leave = self._state(
668 671 'state-enter',
669 672 hex(self.oldnode))
670 673 return self
671 674
672 675 def __exit__(self, type_, value, tb):
673 676 abort = True if type_ else False
674 677 self.exit(abort=abort)
675 678
676 679 def exit(self, abort=False):
677 680 try:
678 681 if self.need_leave:
679 682 status = 'failed' if abort else 'ok'
680 683 if self.newnode is None:
681 684 self.newnode = self.repo['.'].node()
682 685 if self.distance is None:
683 686 self.distance = calcdistance(
684 687 self.repo, self.oldnode, self.newnode)
685 688 self._state(
686 689 'state-leave',
687 690 hex(self.newnode),
688 691 status=status)
689 692 finally:
690 693 self.need_leave = False
691 694 if self._lock:
692 695 self._lock.release()
693 696
694 697 def _state(self, cmd, commithash, status='ok'):
695 698 if not util.safehasattr(self.repo, '_watchmanclient'):
696 699 return False
697 700 try:
698 701 self.repo._watchmanclient.command(cmd, {
699 702 'name': self.name,
700 703 'metadata': {
701 704 # the target revision
702 705 'rev': commithash,
703 706 # approximate number of commits between current and target
704 707 'distance': self.distance if self.distance else 0,
705 708 # success/failure (only really meaningful for state-leave)
706 709 'status': status,
707 710 # whether the working copy parent is changing
708 711 'partial': self.partial,
709 712 }})
710 713 return True
711 714 except Exception as e:
712 715 # Swallow any errors; fire and forget
713 716 self.repo.ui.log(
714 717 'watchman', 'Exception %s while running %s\n', e, cmd)
715 718 return False
716 719
717 720 # Estimate the distance between two nodes
718 721 def calcdistance(repo, oldnode, newnode):
719 722 anc = repo.changelog.ancestor(oldnode, newnode)
720 723 ancrev = repo[anc].rev()
721 724 distance = (abs(repo[oldnode].rev() - ancrev)
722 725 + abs(repo[newnode].rev() - ancrev))
723 726 return distance
724 727
725 728 # Bracket working copy updates with calls to the watchman state-enter
726 729 # and state-leave commands. This allows clients to perform more intelligent
727 730 # settling during bulk file change scenarios
728 731 # https://facebook.github.io/watchman/docs/cmd/subscribe.html#advanced-settling
729 732 def wrapupdate(orig, repo, node, branchmerge, force, ancestor=None,
730 733 mergeancestor=False, labels=None, matcher=None, **kwargs):
731 734
732 735 distance = 0
733 736 partial = True
734 737 oldnode = repo['.'].node()
735 738 newnode = repo[node].node()
736 739 if matcher is None or matcher.always():
737 740 partial = False
738 741 distance = calcdistance(repo.unfiltered(), oldnode, newnode)
739 742
740 743 with state_update(repo, name="hg.update", oldnode=oldnode, newnode=newnode,
741 744 distance=distance, partial=partial):
742 745 return orig(
743 746 repo, node, branchmerge, force, ancestor, mergeancestor,
744 747 labels, matcher, **kwargs)
745 748
746 749 def repo_has_depth_one_nested_repo(repo):
747 750 for f in repo.wvfs.listdir():
748 751 if os.path.isdir(os.path.join(repo.root, f, '.hg')):
749 752 msg = 'fsmonitor: sub-repository %r detected, fsmonitor disabled\n'
750 753 repo.ui.debug(msg % f)
751 754 return True
752 755 return False
753 756
754 757 def reposetup(ui, repo):
755 758 # We don't work with largefiles or inotify
756 759 exts = extensions.enabled()
757 760 for ext in _blacklist:
758 761 if ext in exts:
759 762 ui.warn(_('The fsmonitor extension is incompatible with the %s '
760 763 'extension and has been disabled.\n') % ext)
761 764 return
762 765
763 766 if repo.local():
764 767 # We don't work with subrepos either.
765 768 #
766 769 # if repo[None].substate can cause a dirstate parse, which is too
767 770 # slow. Instead, look for a file called hgsubstate,
768 771 if repo.wvfs.exists('.hgsubstate') or repo.wvfs.exists('.hgsub'):
769 772 return
770 773
771 774 if repo_has_depth_one_nested_repo(repo):
772 775 return
773 776
774 777 fsmonitorstate = state.state(repo)
775 778 if fsmonitorstate.mode == 'off':
776 779 return
777 780
778 781 try:
779 782 client = watchmanclient.client(repo)
780 783 except Exception as ex:
781 784 _handleunavailable(ui, fsmonitorstate, ex)
782 785 return
783 786
784 787 repo._fsmonitorstate = fsmonitorstate
785 788 repo._watchmanclient = client
786 789
787 790 dirstate, cached = localrepo.isfilecached(repo, 'dirstate')
788 791 if cached:
789 792 # at this point since fsmonitorstate wasn't present,
790 793 # repo.dirstate is not a fsmonitordirstate
791 794 makedirstate(repo, dirstate)
792 795
793 796 class fsmonitorrepo(repo.__class__):
794 797 def status(self, *args, **kwargs):
795 798 orig = super(fsmonitorrepo, self).status
796 799 return overridestatus(orig, self, *args, **kwargs)
797 800
798 801 def wlocknostateupdate(self, *args, **kwargs):
799 802 return super(fsmonitorrepo, self).wlock(*args, **kwargs)
800 803
801 804 def wlock(self, *args, **kwargs):
802 805 l = super(fsmonitorrepo, self).wlock(*args, **kwargs)
803 806 if not ui.configbool(
804 807 "experimental", "fsmonitor.transaction_notify"):
805 808 return l
806 809 if l.held != 1:
807 810 return l
808 811 origrelease = l.releasefn
809 812
810 813 def staterelease():
811 814 if origrelease:
812 815 origrelease()
813 816 if l.stateupdate:
814 817 l.stateupdate.exit()
815 818 l.stateupdate = None
816 819
817 820 try:
818 821 l.stateupdate = None
819 822 l.stateupdate = state_update(self, name="hg.transaction")
820 823 l.stateupdate.enter()
821 824 l.releasefn = staterelease
822 825 except Exception as e:
823 826 # Swallow any errors; fire and forget
824 827 self.ui.log(
825 828 'watchman', 'Exception in state update %s\n', e)
826 829 return l
827 830
828 831 repo.__class__ = fsmonitorrepo
@@ -1,1031 +1,1035 b''
1 1 # Copyright 2014-present Facebook, Inc.
2 2 # All rights reserved.
3 3 #
4 4 # Redistribution and use in source and binary forms, with or without
5 5 # modification, are permitted provided that the following conditions are met:
6 6 #
7 7 # * Redistributions of source code must retain the above copyright notice,
8 8 # this list of conditions and the following disclaimer.
9 9 #
10 10 # * Redistributions in binary form must reproduce the above copyright notice,
11 11 # this list of conditions and the following disclaimer in the documentation
12 12 # and/or other materials provided with the distribution.
13 13 #
14 14 # * Neither the name Facebook nor the names of its contributors may be used to
15 15 # endorse or promote products derived from this software without specific
16 16 # prior written permission.
17 17 #
18 18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 19 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 20 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21 21 # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22 22 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 23 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24 24 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25 25 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26 26 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 28
29 29 from __future__ import absolute_import
30 30 from __future__ import division
31 31 from __future__ import print_function
32 32 # no unicode literals
33 33
34 34 import inspect
35 35 import math
36 36 import os
37 37 import socket
38 38 import subprocess
39 39 import time
40 40
41 41 # Sometimes it's really hard to get Python extensions to compile,
42 42 # so fall back to a pure Python implementation.
43 43 try:
44 44 from . import bser
45 45 # Demandimport causes modules to be loaded lazily. Force the load now
46 46 # so that we can fall back on pybser if bser doesn't exist
47 47 bser.pdu_info
48 48 except ImportError:
49 49 from . import pybser as bser
50 50
51 51 from mercurial.utils import (
52 52 procutil,
53 53 )
54 54
55 55 from mercurial import (
56 56 pycompat,
57 57 )
58 58
59 59 from . import (
60 60 capabilities,
61 61 compat,
62 62 encoding,
63 63 load,
64 64 )
65 65
66 66
67 67 if os.name == 'nt':
68 68 import ctypes
69 69 import ctypes.wintypes
70 70
71 71 wintypes = ctypes.wintypes
72 72 GENERIC_READ = 0x80000000
73 73 GENERIC_WRITE = 0x40000000
74 74 FILE_FLAG_OVERLAPPED = 0x40000000
75 75 OPEN_EXISTING = 3
76 76 INVALID_HANDLE_VALUE = -1
77 77 FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
78 78 FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
79 79 FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
80 80 WAIT_FAILED = 0xFFFFFFFF
81 81 WAIT_TIMEOUT = 0x00000102
82 82 WAIT_OBJECT_0 = 0x00000000
83 83 WAIT_IO_COMPLETION = 0x000000C0
84 84 INFINITE = 0xFFFFFFFF
85 85
86 86 # Overlapped I/O operation is in progress. (997)
87 87 ERROR_IO_PENDING = 0x000003E5
88 88
89 89 # The pointer size follows the architecture
90 90 # We use WPARAM since this type is already conditionally defined
91 91 ULONG_PTR = ctypes.wintypes.WPARAM
92 92
93 93 class OVERLAPPED(ctypes.Structure):
94 94 _fields_ = [
95 95 ("Internal", ULONG_PTR), ("InternalHigh", ULONG_PTR),
96 96 ("Offset", wintypes.DWORD), ("OffsetHigh", wintypes.DWORD),
97 97 ("hEvent", wintypes.HANDLE)
98 98 ]
99 99
100 100 def __init__(self):
101 101 self.Internal = 0
102 102 self.InternalHigh = 0
103 103 self.Offset = 0
104 104 self.OffsetHigh = 0
105 105 self.hEvent = 0
106 106
107 107 LPDWORD = ctypes.POINTER(wintypes.DWORD)
108 108
109 109 CreateFile = ctypes.windll.kernel32.CreateFileA
110 110 CreateFile.argtypes = [wintypes.LPSTR, wintypes.DWORD, wintypes.DWORD,
111 111 wintypes.LPVOID, wintypes.DWORD, wintypes.DWORD,
112 112 wintypes.HANDLE]
113 113 CreateFile.restype = wintypes.HANDLE
114 114
115 115 CloseHandle = ctypes.windll.kernel32.CloseHandle
116 116 CloseHandle.argtypes = [wintypes.HANDLE]
117 117 CloseHandle.restype = wintypes.BOOL
118 118
119 119 ReadFile = ctypes.windll.kernel32.ReadFile
120 120 ReadFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD,
121 121 LPDWORD, ctypes.POINTER(OVERLAPPED)]
122 122 ReadFile.restype = wintypes.BOOL
123 123
124 124 WriteFile = ctypes.windll.kernel32.WriteFile
125 125 WriteFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD,
126 126 LPDWORD, ctypes.POINTER(OVERLAPPED)]
127 127 WriteFile.restype = wintypes.BOOL
128 128
129 129 GetLastError = ctypes.windll.kernel32.GetLastError
130 130 GetLastError.argtypes = []
131 131 GetLastError.restype = wintypes.DWORD
132 132
133 133 SetLastError = ctypes.windll.kernel32.SetLastError
134 134 SetLastError.argtypes = [wintypes.DWORD]
135 135 SetLastError.restype = None
136 136
137 137 FormatMessage = ctypes.windll.kernel32.FormatMessageA
138 138 FormatMessage.argtypes = [wintypes.DWORD, wintypes.LPVOID, wintypes.DWORD,
139 139 wintypes.DWORD, ctypes.POINTER(wintypes.LPSTR),
140 140 wintypes.DWORD, wintypes.LPVOID]
141 141 FormatMessage.restype = wintypes.DWORD
142 142
143 143 LocalFree = ctypes.windll.kernel32.LocalFree
144 144
145 145 GetOverlappedResult = ctypes.windll.kernel32.GetOverlappedResult
146 146 GetOverlappedResult.argtypes = [wintypes.HANDLE,
147 147 ctypes.POINTER(OVERLAPPED), LPDWORD,
148 148 wintypes.BOOL]
149 149 GetOverlappedResult.restype = wintypes.BOOL
150 150
151 151 GetOverlappedResultEx = getattr(ctypes.windll.kernel32,
152 152 'GetOverlappedResultEx', None)
153 153 if GetOverlappedResultEx is not None:
154 154 GetOverlappedResultEx.argtypes = [wintypes.HANDLE,
155 155 ctypes.POINTER(OVERLAPPED), LPDWORD,
156 156 wintypes.DWORD, wintypes.BOOL]
157 157 GetOverlappedResultEx.restype = wintypes.BOOL
158 158
159 159 WaitForSingleObjectEx = ctypes.windll.kernel32.WaitForSingleObjectEx
160 160 WaitForSingleObjectEx.argtypes = [wintypes.HANDLE, wintypes.DWORD, wintypes.BOOL]
161 161 WaitForSingleObjectEx.restype = wintypes.DWORD
162 162
163 163 CreateEvent = ctypes.windll.kernel32.CreateEventA
164 164 CreateEvent.argtypes = [LPDWORD, wintypes.BOOL, wintypes.BOOL,
165 165 wintypes.LPSTR]
166 166 CreateEvent.restype = wintypes.HANDLE
167 167
168 168 # Windows Vista is the minimum supported client for CancelIoEx.
169 169 CancelIoEx = ctypes.windll.kernel32.CancelIoEx
170 170 CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)]
171 171 CancelIoEx.restype = wintypes.BOOL
172 172
173 173 # 2 bytes marker, 1 byte int size, 8 bytes int64 value
174 174 sniff_len = 13
175 175
176 176 # This is a helper for debugging the client.
177 177 _debugging = False
178 178 if _debugging:
179 179
180 180 def log(fmt, *args):
181 181 print('[%s] %s' %
182 182 (time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()),
183 183 fmt % args[:]))
184 184 else:
185 185
186 186 def log(fmt, *args):
187 187 pass
188 188
189 189
190 190 def _win32_strerror(err):
191 191 """ expand a win32 error code into a human readable message """
192 192
193 193 # FormatMessage will allocate memory and assign it here
194 194 buf = ctypes.c_char_p()
195 195 FormatMessage(
196 196 FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER
197 197 | FORMAT_MESSAGE_IGNORE_INSERTS, None, err, 0, buf, 0, None)
198 198 try:
199 199 return buf.value
200 200 finally:
201 201 LocalFree(buf)
202 202
203 203
204 204 class WatchmanError(Exception):
205 205 def __init__(self, msg=None, cmd=None):
206 206 self.msg = msg
207 207 self.cmd = cmd
208 208
209 209 def setCommand(self, cmd):
210 210 self.cmd = cmd
211 211
212 212 def __str__(self):
213 213 if self.cmd:
214 214 return '%s, while executing %s' % (self.msg, self.cmd)
215 215 return self.msg
216 216
217 217
218 218 class WatchmanEnvironmentError(WatchmanError):
219 219 def __init__(self, msg, errno, errmsg, cmd=None):
220 220 super(WatchmanEnvironmentError, self).__init__(
221 221 '{0}: errno={1} errmsg={2}'.format(msg, errno, errmsg),
222 222 cmd)
223 223
224 224
225 225 class SocketConnectError(WatchmanError):
226 226 def __init__(self, sockpath, exc):
227 227 super(SocketConnectError, self).__init__(
228 228 'unable to connect to %s: %s' % (sockpath, exc))
229 229 self.sockpath = sockpath
230 230 self.exc = exc
231 231
232 232
233 233 class SocketTimeout(WatchmanError):
234 234 """A specialized exception raised for socket timeouts during communication to/from watchman.
235 235 This makes it easier to implement non-blocking loops as callers can easily distinguish
236 236 between a routine timeout and an actual error condition.
237 237
238 238 Note that catching WatchmanError will also catch this as it is a super-class, so backwards
239 239 compatibility in exception handling is preserved.
240 240 """
241 241
242 242
243 243 class CommandError(WatchmanError):
244 244 """error returned by watchman
245 245
246 246 self.msg is the message returned by watchman.
247 247 """
248 248 def __init__(self, msg, cmd=None):
249 249 super(CommandError, self).__init__(
250 250 'watchman command error: %s' % (msg, ),
251 251 cmd,
252 252 )
253 253
254 254
255 255 class Transport(object):
256 256 """ communication transport to the watchman server """
257 257 buf = None
258 258
259 259 def close(self):
260 260 """ tear it down """
261 261 raise NotImplementedError()
262 262
263 263 def readBytes(self, size):
264 264 """ read size bytes """
265 265 raise NotImplementedError()
266 266
267 267 def write(self, buf):
268 268 """ write some data """
269 269 raise NotImplementedError()
270 270
271 271 def setTimeout(self, value):
272 272 pass
273 273
274 274 def readLine(self):
275 275 """ read a line
276 276 Maintains its own buffer, callers of the transport should not mix
277 277 calls to readBytes and readLine.
278 278 """
279 279 if self.buf is None:
280 280 self.buf = []
281 281
282 282 # Buffer may already have a line if we've received unilateral
283 283 # response(s) from the server
284 284 if len(self.buf) == 1 and b"\n" in self.buf[0]:
285 285 (line, b) = self.buf[0].split(b"\n", 1)
286 286 self.buf = [b]
287 287 return line
288 288
289 289 while True:
290 290 b = self.readBytes(4096)
291 291 if b"\n" in b:
292 292 result = b''.join(self.buf)
293 293 (line, b) = b.split(b"\n", 1)
294 294 self.buf = [b]
295 295 return result + line
296 296 self.buf.append(b)
297 297
298 298
299 299 class Codec(object):
300 300 """ communication encoding for the watchman server """
301 301 transport = None
302 302
303 303 def __init__(self, transport):
304 304 self.transport = transport
305 305
306 306 def receive(self):
307 307 raise NotImplementedError()
308 308
309 309 def send(self, *args):
310 310 raise NotImplementedError()
311 311
312 312 def setTimeout(self, value):
313 313 self.transport.setTimeout(value)
314 314
315 315
316 316 class UnixSocketTransport(Transport):
317 317 """ local unix domain socket transport """
318 318 sock = None
319 319
320 def __init__(self, sockpath, timeout):
320 def __init__(self, sockpath, timeout, watchman_exe):
321 321 self.sockpath = sockpath
322 322 self.timeout = timeout
323 323
324 324 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
325 325 try:
326 326 sock.settimeout(self.timeout)
327 327 sock.connect(self.sockpath)
328 328 self.sock = sock
329 329 except socket.error as e:
330 330 sock.close()
331 331 raise SocketConnectError(self.sockpath, e)
332 332
333 333 def close(self):
334 334 self.sock.close()
335 335 self.sock = None
336 336
337 337 def setTimeout(self, value):
338 338 self.timeout = value
339 339 self.sock.settimeout(self.timeout)
340 340
341 341 def readBytes(self, size):
342 342 try:
343 343 buf = [self.sock.recv(size)]
344 344 if not buf[0]:
345 345 raise WatchmanError('empty watchman response')
346 346 return buf[0]
347 347 except socket.timeout:
348 348 raise SocketTimeout('timed out waiting for response')
349 349
350 350 def write(self, data):
351 351 try:
352 352 self.sock.sendall(data)
353 353 except socket.timeout:
354 354 raise SocketTimeout('timed out sending query command')
355 355
356 356
357 357 def _get_overlapped_result_ex_impl(pipe, olap, nbytes, millis, alertable):
358 358 """ Windows 7 and earlier does not support GetOverlappedResultEx. The
359 359 alternative is to use GetOverlappedResult and wait for read or write
360 360 operation to complete. This is done be using CreateEvent and
361 361 WaitForSingleObjectEx. CreateEvent, WaitForSingleObjectEx
362 362 and GetOverlappedResult are all part of Windows API since WindowsXP.
363 363 This is the exact same implementation that can be found in the watchman
364 364 source code (see get_overlapped_result_ex_impl in stream_win.c). This
365 365 way, maintenance should be simplified.
366 366 """
367 367 log('Preparing to wait for maximum %dms', millis )
368 368 if millis != 0:
369 369 waitReturnCode = WaitForSingleObjectEx(olap.hEvent, millis, alertable)
370 370 if waitReturnCode == WAIT_OBJECT_0:
371 371 # Event is signaled, overlapped IO operation result should be available.
372 372 pass
373 373 elif waitReturnCode == WAIT_IO_COMPLETION:
374 374 # WaitForSingleObjectEx returnes because the system added an I/O completion
375 375 # routine or an asynchronous procedure call (APC) to the thread queue.
376 376 SetLastError(WAIT_IO_COMPLETION)
377 377 pass
378 378 elif waitReturnCode == WAIT_TIMEOUT:
379 379 # We reached the maximum allowed wait time, the IO operation failed
380 380 # to complete in timely fashion.
381 381 SetLastError(WAIT_TIMEOUT)
382 382 return False
383 383 elif waitReturnCode == WAIT_FAILED:
384 384 # something went wrong calling WaitForSingleObjectEx
385 385 err = GetLastError()
386 386 log('WaitForSingleObjectEx failed: %s', _win32_strerror(err))
387 387 return False
388 388 else:
389 389 # unexpected situation deserving investigation.
390 390 err = GetLastError()
391 391 log('Unexpected error: %s', _win32_strerror(err))
392 392 return False
393 393
394 394 return GetOverlappedResult(pipe, olap, nbytes, False)
395 395
396 396
397 397 class WindowsNamedPipeTransport(Transport):
398 398 """ connect to a named pipe """
399 399
400 def __init__(self, sockpath, timeout):
400 def __init__(self, sockpath, timeout, watchman_exe):
401 401 self.sockpath = sockpath
402 402 self.timeout = int(math.ceil(timeout * 1000))
403 403 self._iobuf = None
404 404
405 405 self.pipe = CreateFile(sockpath, GENERIC_READ | GENERIC_WRITE, 0, None,
406 406 OPEN_EXISTING, FILE_FLAG_OVERLAPPED, None)
407 407
408 408 if self.pipe == INVALID_HANDLE_VALUE:
409 409 self.pipe = None
410 410 self._raise_win_err('failed to open pipe %s' % sockpath,
411 411 GetLastError())
412 412
413 413 # event for the overlapped I/O operations
414 414 self._waitable = CreateEvent(None, True, False, None)
415 415 if self._waitable is None:
416 416 self._raise_win_err('CreateEvent failed', GetLastError())
417 417
418 418 self._get_overlapped_result_ex = GetOverlappedResultEx
419 419 if (os.getenv('WATCHMAN_WIN7_COMPAT') == '1' or
420 420 self._get_overlapped_result_ex is None):
421 421 self._get_overlapped_result_ex = _get_overlapped_result_ex_impl
422 422
423 423 def _raise_win_err(self, msg, err):
424 424 raise IOError('%s win32 error code: %d %s' %
425 425 (msg, err, _win32_strerror(err)))
426 426
427 427 def close(self):
428 428 if self.pipe:
429 429 log('Closing pipe')
430 430 CloseHandle(self.pipe)
431 431 self.pipe = None
432 432
433 433 if self._waitable is not None:
434 434 # We release the handle for the event
435 435 CloseHandle(self._waitable)
436 436 self._waitable = None
437 437
438 438 def setTimeout(self, value):
439 439 # convert to milliseconds
440 440 self.timeout = int(value * 1000)
441 441
442 442 def readBytes(self, size):
443 443 """ A read can block for an unbounded amount of time, even if the
444 444 kernel reports that the pipe handle is signalled, so we need to
445 445 always perform our reads asynchronously
446 446 """
447 447
448 448 # try to satisfy the read from any buffered data
449 449 if self._iobuf:
450 450 if size >= len(self._iobuf):
451 451 res = self._iobuf
452 452 self.buf = None
453 453 return res
454 454 res = self._iobuf[:size]
455 455 self._iobuf = self._iobuf[size:]
456 456 return res
457 457
458 458 # We need to initiate a read
459 459 buf = ctypes.create_string_buffer(size)
460 460 olap = OVERLAPPED()
461 461 olap.hEvent = self._waitable
462 462
463 463 log('made read buff of size %d', size)
464 464
465 465 # ReadFile docs warn against sending in the nread parameter for async
466 466 # operations, so we always collect it via GetOverlappedResultEx
467 467 immediate = ReadFile(self.pipe, buf, size, None, olap)
468 468
469 469 if not immediate:
470 470 err = GetLastError()
471 471 if err != ERROR_IO_PENDING:
472 472 self._raise_win_err('failed to read %d bytes' % size,
473 473 GetLastError())
474 474
475 475 nread = wintypes.DWORD()
476 476 if not self._get_overlapped_result_ex(self.pipe, olap, nread,
477 477 0 if immediate else self.timeout,
478 478 True):
479 479 err = GetLastError()
480 480 CancelIoEx(self.pipe, olap)
481 481
482 482 if err == WAIT_TIMEOUT:
483 483 log('GetOverlappedResultEx timedout')
484 484 raise SocketTimeout('timed out after waiting %dms for read' %
485 485 self.timeout)
486 486
487 487 log('GetOverlappedResultEx reports error %d', err)
488 488 self._raise_win_err('error while waiting for read', err)
489 489
490 490 nread = nread.value
491 491 if nread == 0:
492 492 # Docs say that named pipes return 0 byte when the other end did
493 493 # a zero byte write. Since we don't ever do that, the only
494 494 # other way this shows up is if the client has gotten in a weird
495 495 # state, so let's bail out
496 496 CancelIoEx(self.pipe, olap)
497 497 raise IOError('Async read yielded 0 bytes; unpossible!')
498 498
499 499 # Holds precisely the bytes that we read from the prior request
500 500 buf = buf[:nread]
501 501
502 502 returned_size = min(nread, size)
503 503 if returned_size == nread:
504 504 return buf
505 505
506 506 # keep any left-overs around for a later read to consume
507 507 self._iobuf = buf[returned_size:]
508 508 return buf[:returned_size]
509 509
510 510 def write(self, data):
511 511 olap = OVERLAPPED()
512 512 olap.hEvent = self._waitable
513 513
514 514 immediate = WriteFile(self.pipe, ctypes.c_char_p(data), len(data),
515 515 None, olap)
516 516
517 517 if not immediate:
518 518 err = GetLastError()
519 519 if err != ERROR_IO_PENDING:
520 520 self._raise_win_err('failed to write %d bytes' % len(data),
521 521 GetLastError())
522 522
523 523 # Obtain results, waiting if needed
524 524 nwrote = wintypes.DWORD()
525 525 if self._get_overlapped_result_ex(self.pipe, olap, nwrote,
526 526 0 if immediate else self.timeout,
527 527 True):
528 528 log('made write of %d bytes', nwrote.value)
529 529 return nwrote.value
530 530
531 531 err = GetLastError()
532 532
533 533 # It's potentially unsafe to allow the write to continue after
534 534 # we unwind, so let's make a best effort to avoid that happening
535 535 CancelIoEx(self.pipe, olap)
536 536
537 537 if err == WAIT_TIMEOUT:
538 538 raise SocketTimeout('timed out after waiting %dms for write' %
539 539 self.timeout)
540 540 self._raise_win_err('error while waiting for write of %d bytes' %
541 541 len(data), err)
542 542
543 543
544 544 class CLIProcessTransport(Transport):
545 545 """ open a pipe to the cli to talk to the service
546 546 This intended to be used only in the test harness!
547 547
548 548 The CLI is an oddball because we only support JSON input
549 549 and cannot send multiple commands through the same instance,
550 550 so we spawn a new process for each command.
551 551
552 552 We disable server spawning for this implementation, again, because
553 553 it is intended to be used only in our test harness. You really
554 554 should not need to use the CLI transport for anything real.
555 555
556 556 While the CLI can output in BSER, our Transport interface doesn't
557 557 support telling this instance that it should do so. That effectively
558 558 limits this implementation to JSON input and output only at this time.
559 559
560 560 It is the responsibility of the caller to set the send and
561 561 receive codecs appropriately.
562 562 """
563 563 proc = None
564 564 closed = True
565 565
566 def __init__(self, sockpath, timeout):
566 def __init__(self, sockpath, timeout, watchman_exe):
567 567 self.sockpath = sockpath
568 568 self.timeout = timeout
569 self.watchman_exe = watchman_exe
569 570
570 571 def close(self):
571 572 if self.proc:
572 573 if self.proc.pid is not None:
573 574 self.proc.kill()
574 575 self.proc.stdin.close()
575 576 self.proc.stdout.close()
576 577 self.proc = None
577 578
578 579 def _connect(self):
579 580 if self.proc:
580 581 return self.proc
581 582 args = [
582 'watchman',
583 self.watchman_exe,
583 584 '--sockname={0}'.format(self.sockpath),
584 585 '--logfile=/BOGUS',
585 586 '--statefile=/BOGUS',
586 587 '--no-spawn',
587 588 '--no-local',
588 589 '--no-pretty',
589 590 '-j',
590 591 ]
591 592 self.proc = subprocess.Popen(pycompat.rapply(procutil.tonativestr,
592 593 args),
593 594 stdin=subprocess.PIPE,
594 595 stdout=subprocess.PIPE)
595 596 return self.proc
596 597
597 598 def readBytes(self, size):
598 599 self._connect()
599 600 res = self.proc.stdout.read(size)
600 601 if res == '':
601 602 raise WatchmanError('EOF on CLI process transport')
602 603 return res
603 604
604 605 def write(self, data):
605 606 if self.closed:
606 607 self.close()
607 608 self.closed = False
608 609 self._connect()
609 610 res = self.proc.stdin.write(data)
610 611 self.proc.stdin.close()
611 612 self.closed = True
612 613 return res
613 614
614 615
615 616 class BserCodec(Codec):
616 617 """ use the BSER encoding. This is the default, preferred codec """
617 618
618 619 def _loads(self, response):
619 620 return bser.loads(response) # Defaults to BSER v1
620 621
621 622 def receive(self):
622 623 buf = [self.transport.readBytes(sniff_len)]
623 624 if not buf[0]:
624 625 raise WatchmanError('empty watchman response')
625 626
626 627 _1, _2, elen = bser.pdu_info(buf[0])
627 628
628 629 rlen = len(buf[0])
629 630 while elen > rlen:
630 631 buf.append(self.transport.readBytes(elen - rlen))
631 632 rlen += len(buf[-1])
632 633
633 634 response = b''.join(buf)
634 635 try:
635 636 res = self._loads(response)
636 637 return res
637 638 except ValueError as e:
638 639 raise WatchmanError('watchman response decode error: %s' % e)
639 640
640 641 def send(self, *args):
641 642 cmd = bser.dumps(*args) # Defaults to BSER v1
642 643 self.transport.write(cmd)
643 644
644 645
645 646 class ImmutableBserCodec(BserCodec):
646 647 """ use the BSER encoding, decoding values using the newer
647 648 immutable object support """
648 649
649 650 def _loads(self, response):
650 651 return bser.loads(response, False) # Defaults to BSER v1
651 652
652 653
653 654 class Bser2WithFallbackCodec(BserCodec):
654 655 """ use BSER v2 encoding """
655 656
656 657 def __init__(self, transport):
657 658 super(Bser2WithFallbackCodec, self).__init__(transport)
658 659 # Once the server advertises support for bser-v2 we should switch this
659 660 # to 'required' on Python 3.
660 661 self.send(["version", {"optional": ["bser-v2"]}])
661 662
662 663 capabilities = self.receive()
663 664
664 665 if 'error' in capabilities:
665 666 raise Exception('Unsupported BSER version')
666 667
667 668 if capabilities['capabilities']['bser-v2']:
668 669 self.bser_version = 2
669 670 self.bser_capabilities = 0
670 671 else:
671 672 self.bser_version = 1
672 673 self.bser_capabilities = 0
673 674
674 675 def _loads(self, response):
675 676 return bser.loads(response)
676 677
677 678 def receive(self):
678 679 buf = [self.transport.readBytes(sniff_len)]
679 680 if not buf[0]:
680 681 raise WatchmanError('empty watchman response')
681 682
682 683 recv_bser_version, recv_bser_capabilities, elen = bser.pdu_info(buf[0])
683 684
684 685 if hasattr(self, 'bser_version'):
685 686 # Readjust BSER version and capabilities if necessary
686 687 self.bser_version = max(self.bser_version, recv_bser_version)
687 688 self.capabilities = self.bser_capabilities & recv_bser_capabilities
688 689
689 690 rlen = len(buf[0])
690 691 while elen > rlen:
691 692 buf.append(self.transport.readBytes(elen - rlen))
692 693 rlen += len(buf[-1])
693 694
694 695 response = b''.join(buf)
695 696 try:
696 697 res = self._loads(response)
697 698 return res
698 699 except ValueError as e:
699 700 raise WatchmanError('watchman response decode error: %s' % e)
700 701
701 702 def send(self, *args):
702 703 if hasattr(self, 'bser_version'):
703 704 cmd = bser.dumps(*args, version=self.bser_version,
704 705 capabilities=self.bser_capabilities)
705 706 else:
706 707 cmd = bser.dumps(*args)
707 708 self.transport.write(cmd)
708 709
709 710
710 711 class JsonCodec(Codec):
711 712 """ Use json codec. This is here primarily for testing purposes """
712 713 json = None
713 714
714 715 def __init__(self, transport):
715 716 super(JsonCodec, self).__init__(transport)
716 717 # optional dep on json, only if JsonCodec is used
717 718 import json
718 719 self.json = json
719 720
720 721 def receive(self):
721 722 line = self.transport.readLine()
722 723 try:
723 724 # In Python 3, json.loads is a transformation from Unicode string to
724 725 # objects possibly containing Unicode strings. We typically expect
725 726 # the JSON blob to be ASCII-only with non-ASCII characters escaped,
726 727 # but it's possible we might get non-ASCII bytes that are valid
727 728 # UTF-8.
728 729 if compat.PYTHON3:
729 730 line = line.decode('utf-8')
730 731 return self.json.loads(line)
731 732 except Exception as e:
732 733 print(e, line)
733 734 raise
734 735
735 736 def send(self, *args):
736 737 cmd = self.json.dumps(*args)
737 738 # In Python 3, json.dumps is a transformation from objects possibly
738 739 # containing Unicode strings to Unicode string. Even with (the default)
739 740 # ensure_ascii=True, dumps returns a Unicode string.
740 741 if compat.PYTHON3:
741 742 cmd = cmd.encode('ascii')
742 743 self.transport.write(cmd + b"\n")
743 744
744 745
745 746 class client(object):
746 747 """ Handles the communication with the watchman service """
747 748 sockpath = None
748 749 transport = None
749 750 sendCodec = None
750 751 recvCodec = None
751 752 sendConn = None
752 753 recvConn = None
753 754 subs = {} # Keyed by subscription name
754 755 sub_by_root = {} # Keyed by root, then by subscription name
755 756 logs = [] # When log level is raised
756 757 unilateral = ['log', 'subscription']
757 758 tport = None
758 759 useImmutableBser = None
760 watchman_exe = None
759 761
760 762 def __init__(self,
761 763 sockpath=None,
762 764 timeout=1.0,
763 765 transport=None,
764 766 sendEncoding=None,
765 767 recvEncoding=None,
766 useImmutableBser=False):
768 useImmutableBser=False,
769 watchman_exe=None):
767 770 self.sockpath = sockpath
768 771 self.timeout = timeout
769 772 self.useImmutableBser = useImmutableBser
773 self.watchman_exe = watchman_exe
770 774
771 775 if inspect.isclass(transport) and issubclass(transport, Transport):
772 776 self.transport = transport
773 777 else:
774 778 transport = transport or os.getenv('WATCHMAN_TRANSPORT') or 'local'
775 779 if transport == 'local' and os.name == 'nt':
776 780 self.transport = WindowsNamedPipeTransport
777 781 elif transport == 'local':
778 782 self.transport = UnixSocketTransport
779 783 elif transport == 'cli':
780 784 self.transport = CLIProcessTransport
781 785 if sendEncoding is None:
782 786 sendEncoding = 'json'
783 787 if recvEncoding is None:
784 788 recvEncoding = sendEncoding
785 789 else:
786 790 raise WatchmanError('invalid transport %s' % transport)
787 791
788 792 sendEncoding = str(sendEncoding or os.getenv('WATCHMAN_ENCODING') or
789 793 'bser')
790 794 recvEncoding = str(recvEncoding or os.getenv('WATCHMAN_ENCODING') or
791 795 'bser')
792 796
793 797 self.recvCodec = self._parseEncoding(recvEncoding)
794 798 self.sendCodec = self._parseEncoding(sendEncoding)
795 799
796 800 def _parseEncoding(self, enc):
797 801 if enc == 'bser':
798 802 if self.useImmutableBser:
799 803 return ImmutableBserCodec
800 804 return BserCodec
801 805 elif enc == 'experimental-bser-v2':
802 806 return Bser2WithFallbackCodec
803 807 elif enc == 'json':
804 808 return JsonCodec
805 809 else:
806 810 raise WatchmanError('invalid encoding %s' % enc)
807 811
808 812 def _hasprop(self, result, name):
809 813 if self.useImmutableBser:
810 814 return hasattr(result, name)
811 815 return name in result
812 816
813 817 def _resolvesockname(self):
814 818 # if invoked via a trigger, watchman will set this env var; we
815 819 # should use it unless explicitly set otherwise
816 820 path = os.getenv('WATCHMAN_SOCK')
817 821 if path:
818 822 return path
819 823
820 cmd = ['watchman', '--output-encoding=bser', 'get-sockname']
824 cmd = [self.watchman_exe, '--output-encoding=bser', 'get-sockname']
821 825 try:
822 826 args = dict(stdout=subprocess.PIPE,
823 827 stderr=subprocess.PIPE,
824 828 close_fds=os.name != 'nt')
825 829
826 830 if os.name == 'nt':
827 831 # if invoked via an application with graphical user interface,
828 832 # this call will cause a brief command window pop-up.
829 833 # Using the flag STARTF_USESHOWWINDOW to avoid this behavior.
830 834 startupinfo = subprocess.STARTUPINFO()
831 835 startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
832 836 args['startupinfo'] = startupinfo
833 837
834 838 p = subprocess.Popen(pycompat.rapply(procutil.tonativestr, cmd),
835 839 **args)
836 840
837 841 except OSError as e:
838 842 raise WatchmanError('"watchman" executable not in PATH (%s)' % e)
839 843
840 844 stdout, stderr = p.communicate()
841 845 exitcode = p.poll()
842 846
843 847 if exitcode:
844 848 raise WatchmanError("watchman exited with code %d" % exitcode)
845 849
846 850 result = bser.loads(stdout)
847 851 if b'error' in result:
848 852 raise WatchmanError('get-sockname error: %s' % result['error'])
849 853
850 854 return result[b'sockname']
851 855
852 856 def _connect(self):
853 857 """ establish transport connection """
854 858
855 859 if self.recvConn:
856 860 return
857 861
858 862 if self.sockpath is None:
859 863 self.sockpath = self._resolvesockname()
860 864
861 self.tport = self.transport(self.sockpath, self.timeout)
865 self.tport = self.transport(self.sockpath, self.timeout, self.watchman_exe)
862 866 self.sendConn = self.sendCodec(self.tport)
863 867 self.recvConn = self.recvCodec(self.tport)
864 868
865 869 def __del__(self):
866 870 self.close()
867 871
868 872 def close(self):
869 873 if self.tport:
870 874 self.tport.close()
871 875 self.tport = None
872 876 self.recvConn = None
873 877 self.sendConn = None
874 878
875 879 def receive(self):
876 880 """ receive the next PDU from the watchman service
877 881
878 882 If the client has activated subscriptions or logs then
879 883 this PDU may be a unilateral PDU sent by the service to
880 884 inform the client of a log event or subscription change.
881 885
882 886 It may also simply be the response portion of a request
883 887 initiated by query.
884 888
885 889 There are clients in production that subscribe and call
886 890 this in a loop to retrieve all subscription responses,
887 891 so care should be taken when making changes here.
888 892 """
889 893
890 894 self._connect()
891 895 result = self.recvConn.receive()
892 896 if self._hasprop(result, 'error'):
893 897 error = result['error']
894 898 if compat.PYTHON3 and isinstance(self.recvConn, BserCodec):
895 899 error = result['error'].decode('utf-8', 'surrogateescape')
896 900 raise CommandError(error)
897 901
898 902 if self._hasprop(result, 'log'):
899 903 log = result['log']
900 904 if compat.PYTHON3 and isinstance(self.recvConn, BserCodec):
901 905 log = log.decode('utf-8', 'surrogateescape')
902 906 self.logs.append(log)
903 907
904 908 if self._hasprop(result, 'subscription'):
905 909 sub = result['subscription']
906 910 if not (sub in self.subs):
907 911 self.subs[sub] = []
908 912 self.subs[sub].append(result)
909 913
910 914 # also accumulate in {root,sub} keyed store
911 915 root = os.path.normcase(result['root'])
912 916 if not root in self.sub_by_root:
913 917 self.sub_by_root[root] = {}
914 918 if not sub in self.sub_by_root[root]:
915 919 self.sub_by_root[root][sub] = []
916 920 self.sub_by_root[root][sub].append(result)
917 921
918 922 return result
919 923
920 924 def isUnilateralResponse(self, res):
921 925 if 'unilateral' in res and res['unilateral']:
922 926 return True
923 927 # Fall back to checking for known unilateral responses
924 928 for k in self.unilateral:
925 929 if k in res:
926 930 return True
927 931 return False
928 932
929 933 def getLog(self, remove=True):
930 934 """ Retrieve buffered log data
931 935
932 936 If remove is true the data will be removed from the buffer.
933 937 Otherwise it will be left in the buffer
934 938 """
935 939 res = self.logs
936 940 if remove:
937 941 self.logs = []
938 942 return res
939 943
940 944 def getSubscription(self, name, remove=True, root=None):
941 945 """ Retrieve the data associated with a named subscription
942 946
943 947 If remove is True (the default), the subscription data is removed
944 948 from the buffer. Otherwise the data is returned but left in
945 949 the buffer.
946 950
947 951 Returns None if there is no data associated with `name`
948 952
949 953 If root is not None, then only return the subscription
950 954 data that matches both root and name. When used in this way,
951 955 remove processing impacts both the unscoped and scoped stores
952 956 for the subscription data.
953 957 """
954 958 if compat.PYTHON3 and issubclass(self.recvCodec, BserCodec):
955 959 # People may pass in Unicode strings here -- but currently BSER only
956 960 # returns bytestrings. Deal with that.
957 961 if isinstance(root, str):
958 962 root = encoding.encode_local(root)
959 963 if isinstance(name, str):
960 964 name = name.encode('utf-8')
961 965
962 966 if root is not None:
963 967 if not root in self.sub_by_root:
964 968 return None
965 969 if not name in self.sub_by_root[root]:
966 970 return None
967 971 sub = self.sub_by_root[root][name]
968 972 if remove:
969 973 del self.sub_by_root[root][name]
970 974 # don't let this grow unbounded
971 975 if name in self.subs:
972 976 del self.subs[name]
973 977 return sub
974 978
975 979 if not (name in self.subs):
976 980 return None
977 981 sub = self.subs[name]
978 982 if remove:
979 983 del self.subs[name]
980 984 return sub
981 985
982 986 def query(self, *args):
983 987 """ Send a query to the watchman service and return the response
984 988
985 989 This call will block until the response is returned.
986 990 If any unilateral responses are sent by the service in between
987 991 the request-response they will be buffered up in the client object
988 992 and NOT returned via this method.
989 993 """
990 994
991 995 log('calling client.query')
992 996 self._connect()
993 997 try:
994 998 self.sendConn.send(args)
995 999
996 1000 res = self.receive()
997 1001 while self.isUnilateralResponse(res):
998 1002 res = self.receive()
999 1003
1000 1004 return res
1001 1005 except EnvironmentError as ee:
1002 1006 # When we can depend on Python 3, we can use PEP 3134
1003 1007 # exception chaining here.
1004 1008 raise WatchmanEnvironmentError(
1005 1009 'I/O error communicating with watchman daemon',
1006 1010 ee.errno,
1007 1011 ee.strerror,
1008 1012 args)
1009 1013 except WatchmanError as ex:
1010 1014 ex.setCommand(args)
1011 1015 raise
1012 1016
1013 1017 def capabilityCheck(self, optional=None, required=None):
1014 1018 """ Perform a server capability check """
1015 1019 res = self.query('version', {
1016 1020 'optional': optional or [],
1017 1021 'required': required or []
1018 1022 })
1019 1023
1020 1024 if not self._hasprop(res, 'capabilities'):
1021 1025 # Server doesn't support capabilities, so we need to
1022 1026 # synthesize the results based on the version
1023 1027 capabilities.synthesize(res, optional)
1024 1028 if 'error' in res:
1025 1029 raise CommandError(res['error'])
1026 1030
1027 1031 return res
1028 1032
1029 1033 def setTimeout(self, value):
1030 1034 self.recvConn.setTimeout(value)
1031 1035 self.sendConn.setTimeout(value)
@@ -1,109 +1,111 b''
1 1 # watchmanclient.py - Watchman client for the fsmonitor extension
2 2 #
3 3 # Copyright 2013-2016 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import getpass
11 11
12 12 from mercurial import util
13 13
14 14 from . import pywatchman
15 15
16 16 class Unavailable(Exception):
17 17 def __init__(self, msg, warn=True, invalidate=False):
18 18 self.msg = msg
19 19 self.warn = warn
20 20 if self.msg == 'timed out waiting for response':
21 21 self.warn = False
22 22 self.invalidate = invalidate
23 23
24 24 def __str__(self):
25 25 if self.warn:
26 26 return 'warning: Watchman unavailable: %s' % self.msg
27 27 else:
28 28 return 'Watchman unavailable: %s' % self.msg
29 29
30 30 class WatchmanNoRoot(Unavailable):
31 31 def __init__(self, root, msg):
32 32 self.root = root
33 33 super(WatchmanNoRoot, self).__init__(msg)
34 34
35 35 class client(object):
36 36 def __init__(self, repo, timeout=1.0):
37 37 err = None
38 38 if not self._user:
39 39 err = "couldn't get user"
40 40 warn = True
41 41 if self._user in repo.ui.configlist('fsmonitor', 'blacklistusers'):
42 42 err = 'user %s in blacklist' % self._user
43 43 warn = False
44 44
45 45 if err:
46 46 raise Unavailable(err, warn)
47 47
48 48 self._timeout = timeout
49 49 self._watchmanclient = None
50 50 self._root = repo.root
51 51 self._ui = repo.ui
52 52 self._firsttime = True
53 53
54 54 def settimeout(self, timeout):
55 55 self._timeout = timeout
56 56 if self._watchmanclient is not None:
57 57 self._watchmanclient.setTimeout(timeout)
58 58
59 59 def getcurrentclock(self):
60 60 result = self.command('clock')
61 61 if not util.safehasattr(result, 'clock'):
62 62 raise Unavailable('clock result is missing clock value',
63 63 invalidate=True)
64 64 return result.clock
65 65
66 66 def clearconnection(self):
67 67 self._watchmanclient = None
68 68
69 69 def available(self):
70 70 return self._watchmanclient is not None or self._firsttime
71 71
72 72 @util.propertycache
73 73 def _user(self):
74 74 try:
75 75 return getpass.getuser()
76 76 except KeyError:
77 77 # couldn't figure out our user
78 78 return None
79 79
80 80 def _command(self, *args):
81 81 watchmanargs = (args[0], self._root) + args[1:]
82 82 try:
83 83 if self._watchmanclient is None:
84 84 self._firsttime = False
85 watchman_exe = self._ui.configpath('fsmonitor', 'watchman_exe')
85 86 self._watchmanclient = pywatchman.client(
86 87 timeout=self._timeout,
87 useImmutableBser=True)
88 useImmutableBser=True,
89 watchman_exe=watchman_exe)
88 90 return self._watchmanclient.query(*watchmanargs)
89 91 except pywatchman.CommandError as ex:
90 92 if 'unable to resolve root' in ex.msg:
91 93 raise WatchmanNoRoot(self._root, ex.msg)
92 94 raise Unavailable(ex.msg)
93 95 except pywatchman.WatchmanError as ex:
94 96 raise Unavailable(str(ex))
95 97
96 98 def command(self, *args):
97 99 try:
98 100 try:
99 101 return self._command(*args)
100 102 except WatchmanNoRoot:
101 103 # this 'watch' command can also raise a WatchmanNoRoot if
102 104 # watchman refuses to accept this root
103 105 self._command('watch')
104 106 return self._command(*args)
105 107 except Unavailable:
106 108 # this is in an outer scope to catch Unavailable form any of the
107 109 # above _command calls
108 110 self._watchmanclient = None
109 111 raise
General Comments 0
You need to be logged in to leave comments. Login now