##// END OF EJS Templates
inotify: create a common, OS-independent server entry point...
Nicolas Dumazet -
r9933:2e790215 default
parent child Browse files
Show More
@@ -1,4 +1,4 b''
1 # server.py - inotify status server
1 # linuxserver.py - inotify status server for linux
2 #
2 #
3 # Copyright 2006, 2007, 2008 Bryan O'Sullivan <bos@serpentine.com>
3 # Copyright 2006, 2007, 2008 Bryan O'Sullivan <bos@serpentine.com>
4 # Copyright 2007, 2008 Brendan Cully <brendan@kublai.com>
4 # Copyright 2007, 2008 Brendan Cully <brendan@kublai.com>
@@ -7,9 +7,10 b''
7 # GNU General Public License version 2, incorporated herein by reference.
7 # GNU General Public License version 2, incorporated herein by reference.
8
8
9 from mercurial.i18n import _
9 from mercurial.i18n import _
10 from mercurial import cmdutil, osutil, util
10 from mercurial import osutil, util
11 import common
11 import common
12 import errno, os, select, socket, stat, struct, sys, tempfile, time
12 import server
13 import errno, os, select, stat, sys, time
13
14
14 try:
15 try:
15 import linux as inotify
16 import linux as inotify
@@ -17,28 +18,11 b' try:'
17 except ImportError:
18 except ImportError:
18 raise
19 raise
19
20
20 class AlreadyStartedException(Exception): pass
21
22 def join(a, b):
23 if a:
24 if a[-1] == '/':
25 return a + b
26 return a + '/' + b
27 return b
28
29 def split(path):
30 c = path.rfind('/')
31 if c == -1:
32 return '', path
33 return path[:c], path[c+1:]
34
35 walk_ignored_errors = (errno.ENOENT, errno.ENAMETOOLONG)
36
37 def walkrepodirs(dirstate, absroot):
21 def walkrepodirs(dirstate, absroot):
38 '''Iterate over all subdirectories of this repo.
22 '''Iterate over all subdirectories of this repo.
39 Exclude the .hg directory, any nested repos, and ignored dirs.'''
23 Exclude the .hg directory, any nested repos, and ignored dirs.'''
40 def walkit(dirname, top):
24 def walkit(dirname, top):
41 fullpath = join(absroot, dirname)
25 fullpath = server.join(absroot, dirname)
42 try:
26 try:
43 for name, kind in osutil.listdir(fullpath):
27 for name, kind in osutil.listdir(fullpath):
44 if kind == stat.S_IFDIR:
28 if kind == stat.S_IFDIR:
@@ -46,54 +30,18 b' def walkrepodirs(dirstate, absroot):'
46 if not top:
30 if not top:
47 return
31 return
48 else:
32 else:
49 d = join(dirname, name)
33 d = server.join(dirname, name)
50 if dirstate._ignore(d):
34 if dirstate._ignore(d):
51 continue
35 continue
52 for subdir in walkit(d, False):
36 for subdir in walkit(d, False):
53 yield subdir
37 yield subdir
54 except OSError, err:
38 except OSError, err:
55 if err.errno not in walk_ignored_errors:
39 if err.errno not in server.walk_ignored_errors:
56 raise
40 raise
57 yield fullpath
41 yield fullpath
58
42
59 return walkit('', True)
43 return walkit('', True)
60
44
61 def walk(dirstate, absroot, root):
62 '''Like os.walk, but only yields regular files.'''
63
64 # This function is critical to performance during startup.
65
66 def walkit(root, reporoot):
67 files, dirs = [], []
68
69 try:
70 fullpath = join(absroot, root)
71 for name, kind in osutil.listdir(fullpath):
72 if kind == stat.S_IFDIR:
73 if name == '.hg':
74 if not reporoot:
75 return
76 else:
77 dirs.append(name)
78 path = join(root, name)
79 if dirstate._ignore(path):
80 continue
81 for result in walkit(path, False):
82 yield result
83 elif kind in (stat.S_IFREG, stat.S_IFLNK):
84 files.append(name)
85 yield fullpath, dirs, files
86
87 except OSError, err:
88 if err.errno == errno.ENOTDIR:
89 # fullpath was a directory, but has since been replaced
90 # by a file.
91 yield fullpath, dirs, files
92 elif err.errno not in walk_ignored_errors:
93 raise
94
95 return walkit(root, root == '')
96
97 def _explain_watch_limit(ui, dirstate, rootabs):
45 def _explain_watch_limit(ui, dirstate, rootabs):
98 path = '/proc/sys/fs/inotify/max_user_watches'
46 path = '/proc/sys/fs/inotify/max_user_watches'
99 try:
47 try:
@@ -201,103 +149,10 b' def eventaction(code):'
201 return wrapper
149 return wrapper
202 return decorator
150 return decorator
203
151
204 class directory(object):
152 class repowatcher(server.repowatcher, pollable):
205 """
206 Representing a directory
207
208 * path is the relative path from repo root to this directory
209 * files is a dict listing the files in this directory
210 - keys are file names
211 - values are file status
212 * dirs is a dict listing the subdirectories
213 - key are subdirectories names
214 - values are directory objects
215 """
216 def __init__(self, relpath=''):
217 self.path = relpath
218 self.files = {}
219 self.dirs = {}
220
221 def dir(self, relpath):
222 """
223 Returns the directory contained at the relative path relpath.
224 Creates the intermediate directories if necessary.
225 """
226 if not relpath:
227 return self
228 l = relpath.split('/')
229 ret = self
230 while l:
231 next = l.pop(0)
232 try:
233 ret = ret.dirs[next]
234 except KeyError:
235 d = directory(join(ret.path, next))
236 ret.dirs[next] = d
237 ret = d
238 return ret
239
240 def walk(self, states, visited=None):
241 """
242 yield (filename, status) pairs for items in the trees
243 that have status in states.
244 filenames are relative to the repo root
245 """
246 for file, st in self.files.iteritems():
247 if st in states:
248 yield join(self.path, file), st
249 for dir in self.dirs.itervalues():
250 if visited is not None:
251 visited.add(dir.path)
252 for e in dir.walk(states):
253 yield e
254
255 def lookup(self, states, path, visited):
256 """
257 yield root-relative filenames that match path, and whose
258 status are in states:
259 * if path is a file, yield path
260 * if path is a directory, yield directory files
261 * if path is not tracked, yield nothing
262 """
263 if path[-1] == '/':
264 path = path[:-1]
265
266 paths = path.split('/')
267
268 # we need to check separately for last node
269 last = paths.pop()
270
271 tree = self
272 try:
273 for dir in paths:
274 tree = tree.dirs[dir]
275 except KeyError:
276 # path is not tracked
277 visited.add(tree.path)
278 return
279
280 try:
281 # if path is a directory, walk it
282 target = tree.dirs[last]
283 visited.add(target.path)
284 for file, st in target.walk(states, visited):
285 yield file
286 except KeyError:
287 try:
288 if tree.files[last] in states:
289 # path is a file
290 visited.add(tree.path)
291 yield path
292 except KeyError:
293 # path is not tracked
294 pass
295
296 class repowatcher(pollable):
297 """
153 """
298 Watches inotify events
154 Watches inotify events
299 """
155 """
300 statuskeys = 'almr!?'
301 mask = (
156 mask = (
302 inotify.IN_ATTRIB |
157 inotify.IN_ATTRIB |
303 inotify.IN_CREATE |
158 inotify.IN_CREATE |
@@ -312,11 +167,9 b' class repowatcher(pollable):'
312 0)
167 0)
313
168
314 def __init__(self, ui, dirstate, root):
169 def __init__(self, ui, dirstate, root):
315 self.ui = ui
170 server.repowatcher.__init__(self, ui, dirstate, root)
316 self.dirstate = dirstate
317
171
318 self.wprefix = join(root, '')
172 self.lastevent = {}
319 self.prefixlen = len(self.wprefix)
320 try:
173 try:
321 self.watcher = watcher.watcher()
174 self.watcher = watcher.watcher()
322 except OSError, err:
175 except OSError, err:
@@ -324,18 +177,8 b' class repowatcher(pollable):'
324 err.strerror)
177 err.strerror)
325 self.threshold = watcher.threshold(self.watcher)
178 self.threshold = watcher.threshold(self.watcher)
326 self.fileno = self.watcher.fileno
179 self.fileno = self.watcher.fileno
327
328 self.tree = directory()
329 self.statcache = {}
330 self.statustrees = dict([(s, directory()) for s in self.statuskeys])
331
332 self.last_event = None
333
334 self.lastevent = {}
335
336 self.register(timeout=None)
180 self.register(timeout=None)
337
181
338 self.ds_info = self.dirstate_info()
339 self.handle_timeout()
182 self.handle_timeout()
340 self.scan()
183 self.scan()
341
184
@@ -353,15 +196,6 b' class repowatcher(pollable):'
353 return '+%.2f' % delta
196 return '+%.2f' % delta
354 return '+%.1f' % delta
197 return '+%.1f' % delta
355
198
356 def dirstate_info(self):
357 try:
358 st = os.lstat(self.wprefix + '.hg/dirstate')
359 return st.st_mtime, st.st_ino
360 except OSError, err:
361 if err.errno != errno.ENOENT:
362 raise
363 return 0, 0
364
365 def add_watch(self, path, mask):
199 def add_watch(self, path, mask):
366 if not path:
200 if not path:
367 return
201 return
@@ -382,101 +216,16 b' class repowatcher(pollable):'
382 self.add_watch(self.wprefix + '.hg', inotify.IN_DELETE)
216 self.add_watch(self.wprefix + '.hg', inotify.IN_DELETE)
383 self.check_dirstate()
217 self.check_dirstate()
384
218
385 def filestatus(self, fn, st):
386 try:
387 type_, mode, size, time = self.dirstate._map[fn][:4]
388 except KeyError:
389 type_ = '?'
390 if type_ == 'n':
391 st_mode, st_size, st_mtime = st
392 if size == -1:
393 return 'l'
394 if size and (size != st_size or (mode ^ st_mode) & 0100):
395 return 'm'
396 if time != int(st_mtime):
397 return 'l'
398 return 'n'
399 if type_ == '?' and self.dirstate._ignore(fn):
400 return 'i'
401 return type_
402
403 def updatefile(self, wfn, osstat):
404 '''
405 update the file entry of an existing file.
406
407 osstat: (mode, size, time) tuple, as returned by os.lstat(wfn)
408 '''
409
410 self._updatestatus(wfn, self.filestatus(wfn, osstat))
411
412 def deletefile(self, wfn, oldstatus):
413 '''
414 update the entry of a file which has been deleted.
415
416 oldstatus: char in statuskeys, status of the file before deletion
417 '''
418 if oldstatus == 'r':
419 newstatus = 'r'
420 elif oldstatus in 'almn':
421 newstatus = '!'
422 else:
423 newstatus = None
424
425 self.statcache.pop(wfn, None)
426 self._updatestatus(wfn, newstatus)
427
428 def _updatestatus(self, wfn, newstatus):
429 '''
430 Update the stored status of a file.
431
432 newstatus: - char in (statuskeys + 'ni'), new status to apply.
433 - or None, to stop tracking wfn
434 '''
435 root, fn = split(wfn)
436 d = self.tree.dir(root)
437
438 oldstatus = d.files.get(fn)
439 # oldstatus can be either:
440 # - None : fn is new
441 # - a char in statuskeys: fn is a (tracked) file
442
443 if self.ui.debugflag and oldstatus != newstatus:
444 self.ui.note(_('status: %r %s -> %s\n') %
445 (wfn, oldstatus, newstatus))
446
447 if oldstatus and oldstatus in self.statuskeys \
448 and oldstatus != newstatus:
449 del self.statustrees[oldstatus].dir(root).files[fn]
450
451 if newstatus in (None, 'i'):
452 d.files.pop(fn, None)
453 elif oldstatus != newstatus:
454 d.files[fn] = newstatus
455 if newstatus != 'n':
456 self.statustrees[newstatus].dir(root).files[fn] = newstatus
457
458
459 def check_deleted(self, key):
460 # Files that had been deleted but were present in the dirstate
461 # may have vanished from the dirstate; we must clean them up.
462 nuke = []
463 for wfn, ignore in self.statustrees[key].walk(key):
464 if wfn not in self.dirstate:
465 nuke.append(wfn)
466 for wfn in nuke:
467 root, fn = split(wfn)
468 del self.statustrees[key].dir(root).files[fn]
469 del self.tree.dir(root).files[fn]
470
471 def scan(self, topdir=''):
219 def scan(self, topdir=''):
472 ds = self.dirstate._map.copy()
220 ds = self.dirstate._map.copy()
473 self.add_watch(join(self.wprefix, topdir), self.mask)
221 self.add_watch(server.join(self.wprefix, topdir), self.mask)
474 for root, dirs, files in walk(self.dirstate, self.wprefix, topdir):
222 for root, dirs, files in server.walk(self.dirstate, self.wprefix,
223 topdir):
475 for d in dirs:
224 for d in dirs:
476 self.add_watch(join(root, d), self.mask)
225 self.add_watch(server.join(root, d), self.mask)
477 wroot = root[self.prefixlen:]
226 wroot = root[self.prefixlen:]
478 for fn in files:
227 for fn in files:
479 wfn = join(wroot, fn)
228 wfn = server.join(wroot, fn)
480 self.updatefile(wfn, self.getstat(wfn))
229 self.updatefile(wfn, self.getstat(wfn))
481 ds.pop(wfn, None)
230 ds.pop(wfn, None)
482 wtopdir = topdir
231 wtopdir = topdir
@@ -495,56 +244,6 b' class repowatcher(pollable):'
495 self.check_deleted('!')
244 self.check_deleted('!')
496 self.check_deleted('r')
245 self.check_deleted('r')
497
246
498 def check_dirstate(self):
499 ds_info = self.dirstate_info()
500 if ds_info == self.ds_info:
501 return
502 self.ds_info = ds_info
503 if not self.ui.debugflag:
504 self.last_event = None
505 self.ui.note(_('%s dirstate reload\n') % self.event_time())
506 self.dirstate.invalidate()
507 self.handle_timeout()
508 self.scan()
509 self.ui.note(_('%s end dirstate reload\n') % self.event_time())
510
511 def update_hgignore(self):
512 # An update of the ignore file can potentially change the
513 # states of all unknown and ignored files.
514
515 # XXX If the user has other ignore files outside the repo, or
516 # changes their list of ignore files at run time, we'll
517 # potentially never see changes to them. We could get the
518 # client to report to us what ignore data they're using.
519 # But it's easier to do nothing than to open that can of
520 # worms.
521
522 if '_ignore' in self.dirstate.__dict__:
523 delattr(self.dirstate, '_ignore')
524 self.ui.note(_('rescanning due to .hgignore change\n'))
525 self.handle_timeout()
526 self.scan()
527
528 def getstat(self, wpath):
529 try:
530 return self.statcache[wpath]
531 except KeyError:
532 try:
533 return self.stat(wpath)
534 except OSError, err:
535 if err.errno != errno.ENOENT:
536 raise
537
538 def stat(self, wpath):
539 try:
540 st = os.lstat(join(self.wprefix, wpath))
541 ret = st.st_mode, st.st_size, st.st_mtime
542 self.statcache[wpath] = ret
543 return ret
544 except OSError:
545 self.statcache.pop(wpath, None)
546 raise
547
548 @eventaction('c')
247 @eventaction('c')
549 def created(self, wpath):
248 def created(self, wpath):
550 if wpath == '.hgignore':
249 if wpath == '.hgignore':
@@ -677,136 +376,20 b' class repowatcher(pollable):'
677 """
376 """
678 return sorted(tuple[0][self.prefixlen:] for tuple in self.watcher)
377 return sorted(tuple[0][self.prefixlen:] for tuple in self.watcher)
679
378
680 class server(pollable):
379 class socketlistener(server.socketlistener, pollable):
681 """
380 """
682 Listens for client queries on unix socket inotify.sock
381 Listens for client queries on unix socket inotify.sock
683 """
382 """
684 def __init__(self, ui, root, repowatcher, timeout):
383 def __init__(self, ui, root, repowatcher, timeout):
685 self.ui = ui
384 server.socketlistener.__init__(self, ui, root, repowatcher, timeout)
686 self.repowatcher = repowatcher
687 self.sock = socket.socket(socket.AF_UNIX)
688 self.sockpath = join(root, '.hg/inotify.sock')
689 self.realsockpath = None
690 try:
691 self.sock.bind(self.sockpath)
692 except socket.error, err:
693 if err[0] == errno.EADDRINUSE:
694 raise AlreadyStartedException( _('cannot start: socket is '
695 'already bound'))
696 if err[0] == "AF_UNIX path too long":
697 if os.path.islink(self.sockpath) and \
698 not os.path.exists(self.sockpath):
699 raise util.Abort('inotify-server: cannot start: '
700 '.hg/inotify.sock is a broken symlink')
701 tempdir = tempfile.mkdtemp(prefix="hg-inotify-")
702 self.realsockpath = os.path.join(tempdir, "inotify.sock")
703 try:
704 self.sock.bind(self.realsockpath)
705 os.symlink(self.realsockpath, self.sockpath)
706 except (OSError, socket.error), inst:
707 try:
708 os.unlink(self.realsockpath)
709 except:
710 pass
711 os.rmdir(tempdir)
712 if inst.errno == errno.EEXIST:
713 raise AlreadyStartedException(_('cannot start: tried '
714 'linking .hg/inotify.sock to a temporary socket but'
715 ' .hg/inotify.sock already exists'))
716 raise
717 else:
718 raise
719 self.sock.listen(5)
720 self.fileno = self.sock.fileno
721 self.register(timeout=timeout)
385 self.register(timeout=timeout)
722
386
723 def handle_timeout(self):
387 def handle_timeout(self):
724 pass
388 pass
725
389
726 def answer_stat_query(self, cs):
727 names = cs.read().split('\0')
728
729 states = names.pop()
730
731 self.ui.note(_('answering query for %r\n') % states)
732
733 if self.repowatcher.timeout:
734 # We got a query while a rescan is pending. Make sure we
735 # rescan before responding, or we could give back a wrong
736 # answer.
737 self.repowatcher.handle_timeout()
738
739 visited = set()
740 if not names:
741 def genresult(states, tree):
742 for fn, state in tree.walk(states):
743 yield fn
744 else:
745 def genresult(states, tree):
746 for fn in names:
747 for f in tree.lookup(states, fn, visited):
748 yield f
749
750 return ['\0'.join(r) for r in [
751 genresult('l', self.repowatcher.statustrees['l']),
752 genresult('m', self.repowatcher.statustrees['m']),
753 genresult('a', self.repowatcher.statustrees['a']),
754 genresult('r', self.repowatcher.statustrees['r']),
755 genresult('!', self.repowatcher.statustrees['!']),
756 '?' in states
757 and genresult('?', self.repowatcher.statustrees['?'])
758 or [],
759 [],
760 'c' in states and genresult('n', self.repowatcher.tree) or [],
761 visited
762 ]]
763
764 def answer_dbug_query(self):
765 return ['\0'.join(self.repowatcher.debug())]
766
767 def handle_pollevents(self, events):
390 def handle_pollevents(self, events):
768 for e in events:
391 for e in events:
769 self.handle_pollevent()
392 self.accept_connection()
770
771 def handle_pollevent(self):
772 sock, addr = self.sock.accept()
773
774 cs = common.recvcs(sock)
775 version = ord(cs.read(1))
776
777 if version != common.version:
778 self.ui.warn(_('received query from incompatible client '
779 'version %d\n') % version)
780 try:
781 # try to send back our version to the client
782 # this way, the client too is informed of the mismatch
783 sock.sendall(chr(common.version))
784 except:
785 pass
786 return
787
788 type = cs.read(4)
789
790 if type == 'STAT':
791 results = self.answer_stat_query(cs)
792 elif type == 'DBUG':
793 results = self.answer_dbug_query()
794 else:
795 self.ui.warn(_('unrecognized query type: %s\n') % type)
796 return
797
798 try:
799 try:
800 v = chr(common.version)
801
802 sock.sendall(v + type + struct.pack(common.resphdrfmts[type],
803 *map(len, results)))
804 sock.sendall(''.join(results))
805 finally:
806 sock.shutdown(socket.SHUT_WR)
807 except socket.error, err:
808 if err[0] != errno.EPIPE:
809 raise
810
393
811 def shutdown(self):
394 def shutdown(self):
812 self.sock.close()
395 self.sock.close()
@@ -819,11 +402,20 b' class server(pollable):'
819 if err.errno != errno.ENOENT:
402 if err.errno != errno.ENOENT:
820 raise
403 raise
821
404
405 def answer_stat_query(self, cs):
406 if self.repowatcher.timeout:
407 # We got a query while a rescan is pending. Make sure we
408 # rescan before responding, or we could give back a wrong
409 # answer.
410 self.repowatcher.handle_timeout()
411 return server.socketlistener.answer_stat_query(self, cs)
412
822 class master(object):
413 class master(object):
823 def __init__(self, ui, dirstate, root, timeout=None):
414 def __init__(self, ui, dirstate, root, timeout=None):
824 self.ui = ui
415 self.ui = ui
825 self.repowatcher = repowatcher(ui, dirstate, root)
416 self.repowatcher = repowatcher(ui, dirstate, root)
826 self.server = server(ui, root, self.repowatcher, timeout)
417 self.socketlistener = socketlistener(ui, root, self.repowatcher,
418 timeout)
827
419
828 def shutdown(self):
420 def shutdown(self):
829 for obj in pollable.instances.itervalues():
421 for obj in pollable.instances.itervalues():
@@ -835,35 +427,3 b' class master(object):'
835 if os.getenv('TIME_STARTUP'):
427 if os.getenv('TIME_STARTUP'):
836 sys.exit(0)
428 sys.exit(0)
837 pollable.run()
429 pollable.run()
838
839 def start(ui, dirstate, root, opts):
840 timeout = opts.get('timeout')
841 if timeout:
842 timeout = float(timeout) * 1e3
843
844 class service(object):
845 def init(self):
846 try:
847 self.master = master(ui, dirstate, root, timeout)
848 except AlreadyStartedException, inst:
849 raise util.Abort("inotify-server: %s" % inst)
850
851 def run(self):
852 try:
853 self.master.run()
854 finally:
855 self.master.shutdown()
856
857 if 'inserve' not in sys.argv:
858 runargs = [sys.argv[0], 'inserve', '-R', root]
859 else:
860 runargs = sys.argv[:]
861
862 pidfile = ui.config('inotify', 'pidfile')
863 if opts['daemon'] and pidfile is not None and 'pid-file' not in runargs:
864 runargs.append("--pid-file=%s" % pidfile)
865
866 service = service()
867 logfile = ui.config('inotify', 'log')
868 cmdutil.service(opts, initfn=service.init, runfn=service.run,
869 logfile=logfile, runargs=runargs)
@@ -1,7 +1,6 b''
1 # server.py - inotify status server
1 # server.py - common entry point for inotify status server
2 #
2 #
3 # Copyright 2006, 2007, 2008 Bryan O'Sullivan <bos@serpentine.com>
3 # Copyright 2009 Nicolas Dumazet <nicdumz@gmail.com>
4 # Copyright 2007, 2008 Brendan Cully <brendan@kublai.com>
5 #
4 #
6 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2, incorporated herein by reference.
6 # GNU General Public License version 2, incorporated herein by reference.
@@ -9,13 +8,14 b''
9 from mercurial.i18n import _
8 from mercurial.i18n import _
10 from mercurial import cmdutil, osutil, util
9 from mercurial import cmdutil, osutil, util
11 import common
10 import common
12 import errno, os, select, socket, stat, struct, sys, tempfile, time
13
11
14 try:
12 import errno
15 import linux as inotify
13 import os
16 from linux import watcher
14 import socket
17 except ImportError:
15 import stat
18 raise
16 import struct
17 import sys
18 import tempfile
19
19
20 class AlreadyStartedException(Exception): pass
20 class AlreadyStartedException(Exception): pass
21
21
@@ -34,30 +34,6 b' def split(path):'
34
34
35 walk_ignored_errors = (errno.ENOENT, errno.ENAMETOOLONG)
35 walk_ignored_errors = (errno.ENOENT, errno.ENAMETOOLONG)
36
36
37 def walkrepodirs(dirstate, absroot):
38 '''Iterate over all subdirectories of this repo.
39 Exclude the .hg directory, any nested repos, and ignored dirs.'''
40 def walkit(dirname, top):
41 fullpath = join(absroot, dirname)
42 try:
43 for name, kind in osutil.listdir(fullpath):
44 if kind == stat.S_IFDIR:
45 if name == '.hg':
46 if not top:
47 return
48 else:
49 d = join(dirname, name)
50 if dirstate._ignore(d):
51 continue
52 for subdir in walkit(d, False):
53 yield subdir
54 except OSError, err:
55 if err.errno not in walk_ignored_errors:
56 raise
57 yield fullpath
58
59 return walkit('', True)
60
61 def walk(dirstate, absroot, root):
37 def walk(dirstate, absroot, root):
62 '''Like os.walk, but only yields regular files.'''
38 '''Like os.walk, but only yields regular files.'''
63
39
@@ -94,113 +70,6 b' def walk(dirstate, absroot, root):'
94
70
95 return walkit(root, root == '')
71 return walkit(root, root == '')
96
72
97 def _explain_watch_limit(ui, dirstate, rootabs):
98 path = '/proc/sys/fs/inotify/max_user_watches'
99 try:
100 limit = int(file(path).read())
101 except IOError, err:
102 if err.errno != errno.ENOENT:
103 raise
104 raise util.Abort(_('this system does not seem to '
105 'support inotify'))
106 ui.warn(_('*** the current per-user limit on the number '
107 'of inotify watches is %s\n') % limit)
108 ui.warn(_('*** this limit is too low to watch every '
109 'directory in this repository\n'))
110 ui.warn(_('*** counting directories: '))
111 ndirs = len(list(walkrepodirs(dirstate, rootabs)))
112 ui.warn(_('found %d\n') % ndirs)
113 newlimit = min(limit, 1024)
114 while newlimit < ((limit + ndirs) * 1.1):
115 newlimit *= 2
116 ui.warn(_('*** to raise the limit from %d to %d (run as root):\n') %
117 (limit, newlimit))
118 ui.warn(_('*** echo %d > %s\n') % (newlimit, path))
119 raise util.Abort(_('cannot watch %s until inotify watch limit is raised')
120 % rootabs)
121
122 class pollable(object):
123 """
124 Interface to support polling.
125 The file descriptor returned by fileno() is registered to a polling
126 object.
127 Usage:
128 Every tick, check if an event has happened since the last tick:
129 * If yes, call handle_events
130 * If no, call handle_timeout
131 """
132 poll_events = select.POLLIN
133 instances = {}
134 poll = select.poll()
135
136 def fileno(self):
137 raise NotImplementedError
138
139 def handle_events(self, events):
140 raise NotImplementedError
141
142 def handle_timeout(self):
143 raise NotImplementedError
144
145 def shutdown(self):
146 raise NotImplementedError
147
148 def register(self, timeout):
149 fd = self.fileno()
150
151 pollable.poll.register(fd, pollable.poll_events)
152 pollable.instances[fd] = self
153
154 self.registered = True
155 self.timeout = timeout
156
157 def unregister(self):
158 pollable.poll.unregister(self)
159 self.registered = False
160
161 @classmethod
162 def run(cls):
163 while True:
164 timeout = None
165 timeobj = None
166 for obj in cls.instances.itervalues():
167 if obj.timeout is not None and (timeout is None or obj.timeout < timeout):
168 timeout, timeobj = obj.timeout, obj
169 try:
170 events = cls.poll.poll(timeout)
171 except select.error, err:
172 if err[0] == errno.EINTR:
173 continue
174 raise
175 if events:
176 by_fd = {}
177 for fd, event in events:
178 by_fd.setdefault(fd, []).append(event)
179
180 for fd, events in by_fd.iteritems():
181 cls.instances[fd].handle_pollevents(events)
182
183 elif timeobj:
184 timeobj.handle_timeout()
185
186 def eventaction(code):
187 """
188 Decorator to help handle events in repowatcher
189 """
190 def decorator(f):
191 def wrapper(self, wpath):
192 if code == 'm' and wpath in self.lastevent and \
193 self.lastevent[wpath] in 'cm':
194 return
195 self.lastevent[wpath] = code
196 self.timeout = 250
197
198 f(self, wpath)
199
200 wrapper.func_name = f.func_name
201 return wrapper
202 return decorator
203
204 class directory(object):
73 class directory(object):
205 """
74 """
206 Representing a directory
75 Representing a directory
@@ -293,23 +162,11 b' class directory(object):'
293 # path is not tracked
162 # path is not tracked
294 pass
163 pass
295
164
296 class repowatcher(pollable):
165 class repowatcher(object):
297 """
166 """
298 Watches inotify events
167 Watches inotify events
299 """
168 """
300 statuskeys = 'almr!?'
169 statuskeys = 'almr!?'
301 mask = (
302 inotify.IN_ATTRIB |
303 inotify.IN_CREATE |
304 inotify.IN_DELETE |
305 inotify.IN_DELETE_SELF |
306 inotify.IN_MODIFY |
307 inotify.IN_MOVED_FROM |
308 inotify.IN_MOVED_TO |
309 inotify.IN_MOVE_SELF |
310 inotify.IN_ONLYDIR |
311 inotify.IN_UNMOUNT |
312 0)
313
170
314 def __init__(self, ui, dirstate, root):
171 def __init__(self, ui, dirstate, root):
315 self.ui = ui
172 self.ui = ui
@@ -317,41 +174,18 b' class repowatcher(pollable):'
317
174
318 self.wprefix = join(root, '')
175 self.wprefix = join(root, '')
319 self.prefixlen = len(self.wprefix)
176 self.prefixlen = len(self.wprefix)
320 try:
321 self.watcher = watcher.watcher()
322 except OSError, err:
323 raise util.Abort(_('inotify service not available: %s') %
324 err.strerror)
325 self.threshold = watcher.threshold(self.watcher)
326 self.fileno = self.watcher.fileno
327
177
328 self.tree = directory()
178 self.tree = directory()
329 self.statcache = {}
179 self.statcache = {}
330 self.statustrees = dict([(s, directory()) for s in self.statuskeys])
180 self.statustrees = dict([(s, directory()) for s in self.statuskeys])
331
181
182 self.ds_info = self.dirstate_info()
183
332 self.last_event = None
184 self.last_event = None
333
185
334 self.lastevent = {}
335
186
336 self.register(timeout=None)
187 def handle_timeout(self):
337
188 pass
338 self.ds_info = self.dirstate_info()
339 self.handle_timeout()
340 self.scan()
341
342 def event_time(self):
343 last = self.last_event
344 now = time.time()
345 self.last_event = now
346
347 if last is None:
348 return 'start'
349 delta = now - last
350 if delta < 5:
351 return '+%.3f' % delta
352 if delta < 50:
353 return '+%.2f' % delta
354 return '+%.1f' % delta
355
189
356 def dirstate_info(self):
190 def dirstate_info(self):
357 try:
191 try:
@@ -362,26 +196,6 b' class repowatcher(pollable):'
362 raise
196 raise
363 return 0, 0
197 return 0, 0
364
198
365 def add_watch(self, path, mask):
366 if not path:
367 return
368 if self.watcher.path(path) is None:
369 if self.ui.debugflag:
370 self.ui.note(_('watching %r\n') % path[self.prefixlen:])
371 try:
372 self.watcher.add(path, mask)
373 except OSError, err:
374 if err.errno in (errno.ENOENT, errno.ENOTDIR):
375 return
376 if err.errno != errno.ENOSPC:
377 raise
378 _explain_watch_limit(self.ui, self.dirstate, self.wprefix)
379
380 def setup(self):
381 self.ui.note(_('watching directories under %r\n') % self.wprefix)
382 self.add_watch(self.wprefix + '.hg', inotify.IN_DELETE)
383 self.check_dirstate()
384
385 def filestatus(self, fn, st):
199 def filestatus(self, fn, st):
386 try:
200 try:
387 type_, mode, size, time = self.dirstate._map[fn][:4]
201 type_, mode, size, time = self.dirstate._map[fn][:4]
@@ -455,7 +269,6 b' class repowatcher(pollable):'
455 if newstatus != 'n':
269 if newstatus != 'n':
456 self.statustrees[newstatus].dir(root).files[fn] = newstatus
270 self.statustrees[newstatus].dir(root).files[fn] = newstatus
457
271
458
459 def check_deleted(self, key):
272 def check_deleted(self, key):
460 # Files that had been deleted but were present in the dirstate
273 # Files that had been deleted but were present in the dirstate
461 # may have vanished from the dirstate; we must clean them up.
274 # may have vanished from the dirstate; we must clean them up.
@@ -468,33 +281,6 b' class repowatcher(pollable):'
468 del self.statustrees[key].dir(root).files[fn]
281 del self.statustrees[key].dir(root).files[fn]
469 del self.tree.dir(root).files[fn]
282 del self.tree.dir(root).files[fn]
470
283
471 def scan(self, topdir=''):
472 ds = self.dirstate._map.copy()
473 self.add_watch(join(self.wprefix, topdir), self.mask)
474 for root, dirs, files in walk(self.dirstate, self.wprefix, topdir):
475 for d in dirs:
476 self.add_watch(join(root, d), self.mask)
477 wroot = root[self.prefixlen:]
478 for fn in files:
479 wfn = join(wroot, fn)
480 self.updatefile(wfn, self.getstat(wfn))
481 ds.pop(wfn, None)
482 wtopdir = topdir
483 if wtopdir and wtopdir[-1] != '/':
484 wtopdir += '/'
485 for wfn, state in ds.iteritems():
486 if not wfn.startswith(wtopdir):
487 continue
488 try:
489 st = self.stat(wfn)
490 except OSError:
491 status = state[0]
492 self.deletefile(wfn, status)
493 else:
494 self.updatefile(wfn, st)
495 self.check_deleted('!')
496 self.check_deleted('r')
497
498 def check_dirstate(self):
284 def check_dirstate(self):
499 ds_info = self.dirstate_info()
285 ds_info = self.dirstate_info()
500 if ds_info == self.ds_info:
286 if ds_info == self.ds_info:
@@ -502,11 +288,9 b' class repowatcher(pollable):'
502 self.ds_info = ds_info
288 self.ds_info = ds_info
503 if not self.ui.debugflag:
289 if not self.ui.debugflag:
504 self.last_event = None
290 self.last_event = None
505 self.ui.note(_('%s dirstate reload\n') % self.event_time())
506 self.dirstate.invalidate()
291 self.dirstate.invalidate()
507 self.handle_timeout()
292 self.handle_timeout()
508 self.scan()
293 self.scan()
509 self.ui.note(_('%s end dirstate reload\n') % self.event_time())
510
294
511 def update_hgignore(self):
295 def update_hgignore(self):
512 # An update of the ignore file can potentially change the
296 # An update of the ignore file can potentially change the
@@ -545,139 +329,7 b' class repowatcher(pollable):'
545 self.statcache.pop(wpath, None)
329 self.statcache.pop(wpath, None)
546 raise
330 raise
547
331
548 @eventaction('c')
332 class socketlistener(object):
549 def created(self, wpath):
550 if wpath == '.hgignore':
551 self.update_hgignore()
552 try:
553 st = self.stat(wpath)
554 if stat.S_ISREG(st[0]):
555 self.updatefile(wpath, st)
556 except OSError:
557 pass
558
559 @eventaction('m')
560 def modified(self, wpath):
561 if wpath == '.hgignore':
562 self.update_hgignore()
563 try:
564 st = self.stat(wpath)
565 if stat.S_ISREG(st[0]):
566 if self.dirstate[wpath] in 'lmn':
567 self.updatefile(wpath, st)
568 except OSError:
569 pass
570
571 @eventaction('d')
572 def deleted(self, wpath):
573 if wpath == '.hgignore':
574 self.update_hgignore()
575 elif wpath.startswith('.hg/'):
576 if wpath == '.hg/wlock':
577 self.check_dirstate()
578 return
579
580 self.deletefile(wpath, self.dirstate[wpath])
581
582 def process_create(self, wpath, evt):
583 if self.ui.debugflag:
584 self.ui.note(_('%s event: created %s\n') %
585 (self.event_time(), wpath))
586
587 if evt.mask & inotify.IN_ISDIR:
588 self.scan(wpath)
589 else:
590 self.created(wpath)
591
592 def process_delete(self, wpath, evt):
593 if self.ui.debugflag:
594 self.ui.note(_('%s event: deleted %s\n') %
595 (self.event_time(), wpath))
596
597 if evt.mask & inotify.IN_ISDIR:
598 tree = self.tree.dir(wpath)
599 todelete = [wfn for wfn, ignore in tree.walk('?')]
600 for fn in todelete:
601 self.deletefile(fn, '?')
602 self.scan(wpath)
603 else:
604 self.deleted(wpath)
605
606 def process_modify(self, wpath, evt):
607 if self.ui.debugflag:
608 self.ui.note(_('%s event: modified %s\n') %
609 (self.event_time(), wpath))
610
611 if not (evt.mask & inotify.IN_ISDIR):
612 self.modified(wpath)
613
614 def process_unmount(self, evt):
615 self.ui.warn(_('filesystem containing %s was unmounted\n') %
616 evt.fullpath)
617 sys.exit(0)
618
619 def handle_pollevents(self, events):
620 if self.ui.debugflag:
621 self.ui.note(_('%s readable: %d bytes\n') %
622 (self.event_time(), self.threshold.readable()))
623 if not self.threshold():
624 if self.registered:
625 if self.ui.debugflag:
626 self.ui.note(_('%s below threshold - unhooking\n') %
627 (self.event_time()))
628 self.unregister()
629 self.timeout = 250
630 else:
631 self.read_events()
632
633 def read_events(self, bufsize=None):
634 events = self.watcher.read(bufsize)
635 if self.ui.debugflag:
636 self.ui.note(_('%s reading %d events\n') %
637 (self.event_time(), len(events)))
638 for evt in events:
639 assert evt.fullpath.startswith(self.wprefix)
640 wpath = evt.fullpath[self.prefixlen:]
641
642 # paths have been normalized, wpath never ends with a '/'
643
644 if wpath.startswith('.hg/') and evt.mask & inotify.IN_ISDIR:
645 # ignore subdirectories of .hg/ (merge, patches...)
646 continue
647
648 if evt.mask & inotify.IN_UNMOUNT:
649 self.process_unmount(wpath, evt)
650 elif evt.mask & (inotify.IN_MODIFY | inotify.IN_ATTRIB):
651 self.process_modify(wpath, evt)
652 elif evt.mask & (inotify.IN_DELETE | inotify.IN_DELETE_SELF |
653 inotify.IN_MOVED_FROM):
654 self.process_delete(wpath, evt)
655 elif evt.mask & (inotify.IN_CREATE | inotify.IN_MOVED_TO):
656 self.process_create(wpath, evt)
657
658 self.lastevent.clear()
659
660 def handle_timeout(self):
661 if not self.registered:
662 if self.ui.debugflag:
663 self.ui.note(_('%s hooking back up with %d bytes readable\n') %
664 (self.event_time(), self.threshold.readable()))
665 self.read_events(0)
666 self.register(timeout=None)
667
668 self.timeout = None
669
670 def shutdown(self):
671 self.watcher.close()
672
673 def debug(self):
674 """
675 Returns a sorted list of relatives paths currently watched,
676 for debugging purposes.
677 """
678 return sorted(tuple[0][self.prefixlen:] for tuple in self.watcher)
679
680 class server(pollable):
681 """
333 """
682 Listens for client queries on unix socket inotify.sock
334 Listens for client queries on unix socket inotify.sock
683 """
335 """
@@ -718,10 +370,6 b' class server(pollable):'
718 raise
370 raise
719 self.sock.listen(5)
371 self.sock.listen(5)
720 self.fileno = self.sock.fileno
372 self.fileno = self.sock.fileno
721 self.register(timeout=timeout)
722
723 def handle_timeout(self):
724 pass
725
373
726 def answer_stat_query(self, cs):
374 def answer_stat_query(self, cs):
727 names = cs.read().split('\0')
375 names = cs.read().split('\0')
@@ -730,12 +378,6 b' class server(pollable):'
730
378
731 self.ui.note(_('answering query for %r\n') % states)
379 self.ui.note(_('answering query for %r\n') % states)
732
380
733 if self.repowatcher.timeout:
734 # We got a query while a rescan is pending. Make sure we
735 # rescan before responding, or we could give back a wrong
736 # answer.
737 self.repowatcher.handle_timeout()
738
739 visited = set()
381 visited = set()
740 if not names:
382 if not names:
741 def genresult(states, tree):
383 def genresult(states, tree):
@@ -764,11 +406,7 b' class server(pollable):'
764 def answer_dbug_query(self):
406 def answer_dbug_query(self):
765 return ['\0'.join(self.repowatcher.debug())]
407 return ['\0'.join(self.repowatcher.debug())]
766
408
767 def handle_pollevents(self, events):
409 def accept_connection(self):
768 for e in events:
769 self.handle_pollevent()
770
771 def handle_pollevent(self):
772 sock, addr = self.sock.accept()
410 sock, addr = self.sock.accept()
773
411
774 cs = common.recvcs(sock)
412 cs = common.recvcs(sock)
@@ -808,33 +446,12 b' class server(pollable):'
808 if err[0] != errno.EPIPE:
446 if err[0] != errno.EPIPE:
809 raise
447 raise
810
448
811 def shutdown(self):
449 if sys.platform == 'linux2':
812 self.sock.close()
450 import linuxserver as _server
813 try:
451 else:
814 os.unlink(self.sockpath)
452 raise ImportError
815 if self.realsockpath:
816 os.unlink(self.realsockpath)
817 os.rmdir(os.path.dirname(self.realsockpath))
818 except OSError, err:
819 if err.errno != errno.ENOENT:
820 raise
821
453
822 class master(object):
454 master = _server.master
823 def __init__(self, ui, dirstate, root, timeout=None):
824 self.ui = ui
825 self.repowatcher = repowatcher(ui, dirstate, root)
826 self.server = server(ui, root, self.repowatcher, timeout)
827
828 def shutdown(self):
829 for obj in pollable.instances.itervalues():
830 obj.shutdown()
831
832 def run(self):
833 self.repowatcher.setup()
834 self.ui.note(_('finished setup\n'))
835 if os.getenv('TIME_STARTUP'):
836 sys.exit(0)
837 pollable.run()
838
455
839 def start(ui, dirstate, root, opts):
456 def start(ui, dirstate, root, opts):
840 timeout = opts.get('timeout')
457 timeout = opts.get('timeout')
General Comments 0
You need to be logged in to leave comments. Login now