##// END OF EJS Templates
inotify: introduce debuginotify, which lists which paths are under watch
Nicolas Dumazet -
r8555:3e09bc5f default
parent child Browse files
Show More
@@ -0,0 +1,32 b''
1 #!/bin/sh
2
3 "$TESTDIR/hghave" inotify || exit 80
4
5 hg init
6
7 echo "[extensions]" >> $HGRCPATH
8 echo "inotify=" >> $HGRCPATH
9
10 echo % inserve
11 hg inserve -d --pid-file=hg.pid
12 cat hg.pid >> "$DAEMON_PIDS"
13
14 # let the daemon finish its stuff
15 sleep 1
16
17 echo % empty
18 hg debuginotify
19
20 mkdir a
21 sleep 1
22
23 echo % only 'a'
24 hg debuginotify
25
26 rmdir a
27 sleep 1
28
29 echo % empty again
30 hg debuginotify
31
32 kill `cat hg.pid`
@@ -0,0 +1,14 b''
1 % inserve
2 % empty
3 directories being watched:
4 /
5 .hg/
6 % only a
7 directories being watched:
8 /
9 .hg/
10 a/
11 % empty again
12 directories being watched:
13 /
14 .hg/
@@ -1,92 +1,106 b''
1 1 # __init__.py - inotify-based status acceleration for Linux
2 2 #
3 3 # Copyright 2006, 2007, 2008 Bryan O'Sullivan <bos@serpentine.com>
4 4 # Copyright 2007, 2008 Brendan Cully <brendan@kublai.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2, incorporated herein by reference.
8 8
9 9 '''inotify-based status acceleration for Linux systems
10 10 '''
11 11
12 12 # todo: socket permissions
13 13
14 14 from mercurial.i18n import _
15 15 from mercurial import cmdutil, util
16 16 import os, server
17 17 from weakref import proxy
18 18 from client import client, QueryFailed
19 19
20 20 def serve(ui, repo, **opts):
21 21 '''start an inotify server for this repository'''
22 22 timeout = opts.get('timeout')
23 23 if timeout:
24 24 timeout = float(timeout) * 1e3
25 25
26 26 class service:
27 27 def init(self):
28 28 try:
29 29 self.master = server.master(ui, repo, timeout)
30 30 except server.AlreadyStartedException, inst:
31 31 raise util.Abort(str(inst))
32 32
33 33 def run(self):
34 34 try:
35 35 self.master.run()
36 36 finally:
37 37 self.master.shutdown()
38 38
39 39 service = service()
40 40 cmdutil.service(opts, initfn=service.init, runfn=service.run)
41 41
42 def debuginotify(ui, repo, **opts):
43 '''debugging information for inotify extension
44
45 Prints the list of directories being watched by the inotify server.
46 '''
47 cli = client(ui, repo)
48 response = cli.debugquery()
49
50 ui.write(_('directories being watched:\n'))
51 for path in response:
52 ui.write((' %s/\n') % path)
53
42 54 def reposetup(ui, repo):
43 55 if not hasattr(repo, 'dirstate'):
44 56 return
45 57
46 58 # XXX: weakref until hg stops relying on __del__
47 59 repo = proxy(repo)
48 60
49 61 class inotifydirstate(repo.dirstate.__class__):
50 62 # Set to True if we're the inotify server, so we don't attempt
51 63 # to recurse.
52 64 inotifyserver = False
53 65
54 66 def status(self, match, ignored, clean, unknown=True):
55 67 files = match.files()
56 68 if '.' in files:
57 69 files = []
58 70 if not ignored and not self.inotifyserver:
59 71 cli = client(ui, repo)
60 72 try:
61 73 result = cli.statusquery(files, match, False,
62 74 clean, unknown)
63 75 except QueryFailed, instr:
64 76 ui.debug(str(instr))
65 77 pass
66 78 else:
67 79 if ui.config('inotify', 'debug'):
68 80 r2 = super(inotifydirstate, self).status(
69 81 match, False, clean, unknown)
70 82 for c,a,b in zip('LMARDUIC', result, r2):
71 83 for f in a:
72 84 if f not in b:
73 85 ui.warn('*** inotify: %s +%s\n' % (c, f))
74 86 for f in b:
75 87 if f not in a:
76 88 ui.warn('*** inotify: %s -%s\n' % (c, f))
77 89 result = r2
78 90 return result
79 91 return super(inotifydirstate, self).status(
80 92 match, ignored, clean, unknown)
81 93
82 94 repo.dirstate.__class__ = inotifydirstate
83 95
84 96 cmdtable = {
97 'debuginotify':
98 (debuginotify, [], ('hg debuginotify')),
85 99 '^inserve':
86 (serve,
87 [('d', 'daemon', None, _('run server in background')),
88 ('', 'daemon-pipefds', '', _('used internally by daemon mode')),
89 ('t', 'idle-timeout', '', _('minutes to sit idle before exiting')),
90 ('', 'pid-file', '', _('name of file to write process ID to'))],
91 _('hg inserve [OPT]...')),
100 (serve,
101 [('d', 'daemon', None, _('run server in background')),
102 ('', 'daemon-pipefds', '', _('used internally by daemon mode')),
103 ('t', 'idle-timeout', '', _('minutes to sit idle before exiting')),
104 ('', 'pid-file', '', _('name of file to write process ID to'))],
105 _('hg inserve [OPT]...')),
92 106 }
@@ -1,146 +1,153 b''
1 1 # client.py - inotify status client
2 2 #
3 3 # Copyright 2006, 2007, 2008 Bryan O'Sullivan <bos@serpentine.com>
4 4 # Copyright 2007, 2008 Brendan Cully <brendan@kublai.com>
5 5 # Copyright 2009 Nicolas Dumazet <nicdumz@gmail.com>
6 6 #
7 7 # This software may be used and distributed according to the terms of the
8 8 # GNU General Public License version 2, incorporated herein by reference.
9 9
10 10 from mercurial.i18n import _
11 11 import common, server
12 12 import errno, os, socket, struct
13 13
14 14 class QueryFailed(Exception): pass
15 15
16 16 def start_server(function):
17 17 """
18 18 Decorator.
19 19 Tries to call function, if it fails, try to (re)start inotify server.
20 20 Raise QueryFailed if something went wrong
21 21 """
22 22 def decorated_function(self, *args):
23 23 result = None
24 24 try:
25 25 return function(self, *args)
26 26 except (OSError, socket.error), err:
27 27 autostart = self.ui.configbool('inotify', 'autostart', True)
28 28
29 29 if err[0] == errno.ECONNREFUSED:
30 30 self.ui.warn(_('(found dead inotify server socket; '
31 31 'removing it)\n'))
32 32 os.unlink(self.repo.join('inotify.sock'))
33 33 if err[0] in (errno.ECONNREFUSED, errno.ENOENT) and autostart:
34 34 self.ui.debug(_('(starting inotify server)\n'))
35 35 try:
36 36 try:
37 37 server.start(self.ui, self.repo)
38 38 except server.AlreadyStartedException, inst:
39 39 # another process may have started its own
40 40 # inotify server while this one was starting.
41 41 self.ui.debug(str(inst))
42 42 except Exception, inst:
43 43 self.ui.warn(_('could not start inotify server: '
44 44 '%s\n') % inst)
45 45 else:
46 46 try:
47 47 return function(self, *args)
48 48 except socket.error, err:
49 49 self.ui.warn(_('could not talk to new inotify '
50 50 'server: %s\n') % err[-1])
51 51 elif err[0] in (errno.ECONNREFUSED, errno.ENOENT):
52 52 # silently ignore normal errors if autostart is False
53 53 self.ui.debug(_('(inotify server not running)\n'))
54 54 else:
55 55 self.ui.warn(_('failed to contact inotify server: %s\n')
56 56 % err[-1])
57 57
58 58 self.ui.traceback()
59 59 raise QueryFailed('inotify query failed')
60 60
61 61 return decorated_function
62 62
63 63
64 64 class client(object):
65 65 def __init__(self, ui, repo):
66 66 self.ui = ui
67 67 self.repo = repo
68 68 self.sock = socket.socket(socket.AF_UNIX)
69 69
70 70 def _connect(self):
71 71 sockpath = self.repo.join('inotify.sock')
72 72 try:
73 73 self.sock.connect(sockpath)
74 74 except socket.error, err:
75 75 if err[0] == "AF_UNIX path too long":
76 76 sockpath = os.readlink(sockpath)
77 77 self.sock.connect(sockpath)
78 78 else:
79 79 raise
80 80
81 81 def _send(self, type, data):
82 82 """Sends protocol version number, and the data"""
83 83 self.sock.sendall(chr(common.version) + type + data)
84 84
85 85 self.sock.shutdown(socket.SHUT_WR)
86 86
87 87 def _receive(self, type):
88 88 """
89 89 Read data, check version number, extract headers,
90 90 and returns a tuple (data descriptor, header)
91 91 Raises QueryFailed on error
92 92 """
93 93 cs = common.recvcs(self.sock)
94 94 version = ord(cs.read(1))
95 95 if version != common.version:
96 96 self.ui.warn(_('(inotify: received response from incompatible '
97 97 'server version %d)\n') % version)
98 98 raise QueryFailed('incompatible server version')
99 99
100 100 readtype = cs.read(4)
101 101 if readtype != type:
102 102 self.ui.warn(_('(inotify: received \'%s\' response when expecting'
103 103 ' \'%s\')\n') % (readtype, type))
104 104 raise QueryFailed('wrong response type')
105 105
106 106 hdrfmt = common.resphdrfmts[type]
107 107 hdrsize = common.resphdrsizes[type]
108 108 try:
109 109 resphdr = struct.unpack(hdrfmt, cs.read(hdrsize))
110 110 except struct.error:
111 111 raise QueryFailed('unable to retrieve query response headers')
112 112
113 113 return cs, resphdr
114 114
115 115 def query(self, type, req):
116 116 self._connect()
117 117
118 118 self._send(type, req)
119 119
120 120 return self._receive(type)
121 121
122 122 @start_server
123 123 def statusquery(self, names, match, ignored, clean, unknown=True):
124 124
125 125 def genquery():
126 126 for n in names:
127 127 yield n
128 128 states = 'almrx!'
129 129 if ignored:
130 130 raise ValueError('this is insanity')
131 131 if clean: states += 'c'
132 132 if unknown: states += '?'
133 133 yield states
134 134
135 135 req = '\0'.join(genquery())
136 136
137 137 cs, resphdr = self.query('STAT', req)
138 138
139 139 def readnames(nbytes):
140 140 if nbytes:
141 141 names = cs.read(nbytes)
142 142 if names:
143 143 return filter(match, names.split('\0'))
144 144 return []
145 return map(readnames, resphdr)
145 146
146 return map(readnames, resphdr)
147 @start_server
148 def debugquery(self):
149 cs, resphdr = self.query('DBUG', '')
150
151 nbytes = resphdr[0]
152 names = cs.read(nbytes)
153 return names.split('\0')
@@ -1,49 +1,51 b''
1 1 # server.py - inotify common protocol code
2 2 #
3 3 # Copyright 2006, 2007, 2008 Bryan O'Sullivan <bos@serpentine.com>
4 4 # Copyright 2007, 2008 Brendan Cully <brendan@kublai.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2, incorporated herein by reference.
8 8
9 9 import cStringIO, socket, struct
10 10
11 11 """
12 12 Protocol between inotify clients and server:
13 13
14 14 Client sending query:
15 15 1) send protocol version number
16 16 2) send query type (string, 4 letters long)
17 17 3) send query parameters:
18 18 - For STAT, N+1 \0-separated strings:
19 19 1) N different names that need checking
20 20 2) 1 string containing all the status types to match
21 - No parameter needed for DBUG
21 22
22 23 Server sending query answer:
23 24 1) send protocol version number
24 25 2) send query type
25 26 3) send struct.pack'ed headers describing the length of the content:
26 27 e.g. for STAT, receive 8 integers describing the length of the
27 28 8 \0-separated string lists ( one list for each lmar!?ic status type )
28 29
29 30 """
30 31
31 32 version = 2
32 33
33 34 resphdrfmts = {
34 'STAT': '>llllllll' # status requests
35 'STAT': '>llllllll', # status requests
36 'DBUG': '>l' # debugging queries
35 37 }
36 38 resphdrsizes = dict((k, struct.calcsize(v))
37 39 for k, v in resphdrfmts.iteritems())
38 40
39 41 def recvcs(sock):
40 42 cs = cStringIO.StringIO()
41 43 s = True
42 44 try:
43 45 while s:
44 46 s = sock.recv(65536)
45 47 cs.write(s)
46 48 finally:
47 49 sock.shutdown(socket.SHUT_RD)
48 50 cs.seek(0)
49 51 return cs
@@ -1,766 +1,778 b''
1 1 # server.py - inotify status server
2 2 #
3 3 # Copyright 2006, 2007, 2008 Bryan O'Sullivan <bos@serpentine.com>
4 4 # Copyright 2007, 2008 Brendan Cully <brendan@kublai.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2, incorporated herein by reference.
8 8
9 9 from mercurial.i18n import _
10 10 from mercurial import osutil, util
11 11 import common
12 12 import errno, os, select, socket, stat, struct, sys, tempfile, time
13 13
14 14 try:
15 15 import linux as inotify
16 16 from linux import watcher
17 17 except ImportError:
18 18 raise
19 19
20 20 class AlreadyStartedException(Exception): pass
21 21
22 22 def join(a, b):
23 23 if a:
24 24 if a[-1] == '/':
25 25 return a + b
26 26 return a + '/' + b
27 27 return b
28 28
29 29 walk_ignored_errors = (errno.ENOENT, errno.ENAMETOOLONG)
30 30
31 31 def walkrepodirs(repo):
32 32 '''Iterate over all subdirectories of this repo.
33 33 Exclude the .hg directory, any nested repos, and ignored dirs.'''
34 34 rootslash = repo.root + os.sep
35 35
36 36 def walkit(dirname, top):
37 37 fullpath = rootslash + dirname
38 38 try:
39 39 for name, kind in osutil.listdir(fullpath):
40 40 if kind == stat.S_IFDIR:
41 41 if name == '.hg':
42 42 if not top:
43 43 return
44 44 else:
45 45 d = join(dirname, name)
46 46 if repo.dirstate._ignore(d):
47 47 continue
48 48 for subdir in walkit(d, False):
49 49 yield subdir
50 50 except OSError, err:
51 51 if err.errno not in walk_ignored_errors:
52 52 raise
53 53 yield fullpath
54 54
55 55 return walkit('', True)
56 56
57 57 def walk(repo, root):
58 58 '''Like os.walk, but only yields regular files.'''
59 59
60 60 # This function is critical to performance during startup.
61 61
62 62 rootslash = repo.root + os.sep
63 63
64 64 def walkit(root, reporoot):
65 65 files, dirs = [], []
66 66
67 67 try:
68 68 fullpath = rootslash + root
69 69 for name, kind in osutil.listdir(fullpath):
70 70 if kind == stat.S_IFDIR:
71 71 if name == '.hg':
72 72 if not reporoot:
73 73 return
74 74 else:
75 75 dirs.append(name)
76 76 path = join(root, name)
77 77 if repo.dirstate._ignore(path):
78 78 continue
79 79 for result in walkit(path, False):
80 80 yield result
81 81 elif kind in (stat.S_IFREG, stat.S_IFLNK):
82 82 files.append(name)
83 83 yield fullpath, dirs, files
84 84
85 85 except OSError, err:
86 86 if err.errno not in walk_ignored_errors:
87 87 raise
88 88
89 89 return walkit(root, root == '')
90 90
91 91 def _explain_watch_limit(ui, repo, count):
92 92 path = '/proc/sys/fs/inotify/max_user_watches'
93 93 try:
94 94 limit = int(file(path).read())
95 95 except IOError, err:
96 96 if err.errno != errno.ENOENT:
97 97 raise
98 98 raise util.Abort(_('this system does not seem to '
99 99 'support inotify'))
100 100 ui.warn(_('*** the current per-user limit on the number '
101 101 'of inotify watches is %s\n') % limit)
102 102 ui.warn(_('*** this limit is too low to watch every '
103 103 'directory in this repository\n'))
104 104 ui.warn(_('*** counting directories: '))
105 105 ndirs = len(list(walkrepodirs(repo)))
106 106 ui.warn(_('found %d\n') % ndirs)
107 107 newlimit = min(limit, 1024)
108 108 while newlimit < ((limit + ndirs) * 1.1):
109 109 newlimit *= 2
110 110 ui.warn(_('*** to raise the limit from %d to %d (run as root):\n') %
111 111 (limit, newlimit))
112 112 ui.warn(_('*** echo %d > %s\n') % (newlimit, path))
113 113 raise util.Abort(_('cannot watch %s until inotify watch limit is raised')
114 114 % repo.root)
115 115
116 116 class repowatcher(object):
117 117 poll_events = select.POLLIN
118 118 statuskeys = 'almr!?'
119 119 mask = (
120 120 inotify.IN_ATTRIB |
121 121 inotify.IN_CREATE |
122 122 inotify.IN_DELETE |
123 123 inotify.IN_DELETE_SELF |
124 124 inotify.IN_MODIFY |
125 125 inotify.IN_MOVED_FROM |
126 126 inotify.IN_MOVED_TO |
127 127 inotify.IN_MOVE_SELF |
128 128 inotify.IN_ONLYDIR |
129 129 inotify.IN_UNMOUNT |
130 130 0)
131 131
132 132 def __init__(self, ui, repo, master):
133 133 self.ui = ui
134 134 self.repo = repo
135 135 self.wprefix = self.repo.wjoin('')
136 136 self.timeout = None
137 137 self.master = master
138 138 try:
139 139 self.watcher = watcher.watcher()
140 140 except OSError, err:
141 141 raise util.Abort(_('inotify service not available: %s') %
142 142 err.strerror)
143 143 self.threshold = watcher.threshold(self.watcher)
144 144 self.registered = True
145 145 self.fileno = self.watcher.fileno
146 146
147 147 self.repo.dirstate.__class__.inotifyserver = True
148 148
149 149 self.tree = {}
150 150 self.statcache = {}
151 151 self.statustrees = dict([(s, {}) for s in self.statuskeys])
152 152
153 153 self.watches = 0
154 154 self.last_event = None
155 155
156 156 self.eventq = {}
157 157 self.deferred = 0
158 158
159 159 self.ds_info = self.dirstate_info()
160 160 self.scan()
161 161
162 162 def event_time(self):
163 163 last = self.last_event
164 164 now = time.time()
165 165 self.last_event = now
166 166
167 167 if last is None:
168 168 return 'start'
169 169 delta = now - last
170 170 if delta < 5:
171 171 return '+%.3f' % delta
172 172 if delta < 50:
173 173 return '+%.2f' % delta
174 174 return '+%.1f' % delta
175 175
176 176 def dirstate_info(self):
177 177 try:
178 178 st = os.lstat(self.repo.join('dirstate'))
179 179 return st.st_mtime, st.st_ino
180 180 except OSError, err:
181 181 if err.errno != errno.ENOENT:
182 182 raise
183 183 return 0, 0
184 184
185 185 def add_watch(self, path, mask):
186 186 if not path:
187 187 return
188 188 if self.watcher.path(path) is None:
189 189 if self.ui.debugflag:
190 190 self.ui.note(_('watching %r\n') % path[len(self.wprefix):])
191 191 try:
192 192 self.watcher.add(path, mask)
193 193 self.watches += 1
194 194 except OSError, err:
195 195 if err.errno in (errno.ENOENT, errno.ENOTDIR):
196 196 return
197 197 if err.errno != errno.ENOSPC:
198 198 raise
199 199 _explain_watch_limit(self.ui, self.repo, self.watches)
200 200
201 201 def setup(self):
202 202 self.ui.note(_('watching directories under %r\n') % self.repo.root)
203 203 self.add_watch(self.repo.path, inotify.IN_DELETE)
204 204 self.check_dirstate()
205 205
206 206 def wpath(self, evt):
207 207 path = evt.fullpath
208 208 if path == self.repo.root:
209 209 return ''
210 210 if path.startswith(self.wprefix):
211 211 return path[len(self.wprefix):]
212 212 raise 'wtf? ' + path
213 213
214 214 def dir(self, tree, path):
215 215 if path:
216 216 for name in path.split('/'):
217 217 tree = tree.setdefault(name, {})
218 218 return tree
219 219
220 220 def lookup(self, path, tree):
221 221 if path:
222 222 try:
223 223 for name in path.split('/'):
224 224 tree = tree[name]
225 225 except KeyError:
226 226 return 'x'
227 227 except TypeError:
228 228 return 'd'
229 229 return tree
230 230
231 231 def split(self, path):
232 232 c = path.rfind('/')
233 233 if c == -1:
234 234 return '', path
235 235 return path[:c], path[c+1:]
236 236
237 237 def filestatus(self, fn, st):
238 238 try:
239 239 type_, mode, size, time = self.repo.dirstate._map[fn][:4]
240 240 except KeyError:
241 241 type_ = '?'
242 242 if type_ == 'n':
243 243 st_mode, st_size, st_mtime = st
244 244 if size == -1:
245 245 return 'l'
246 246 if size and (size != st_size or (mode ^ st_mode) & 0100):
247 247 return 'm'
248 248 if time != int(st_mtime):
249 249 return 'l'
250 250 return 'n'
251 251 if type_ == '?' and self.repo.dirstate._ignore(fn):
252 252 return 'i'
253 253 return type_
254 254
255 255 def updatestatus(self, wfn, osstat=None, newstatus=None):
256 256 '''
257 257 Update the stored status of a file or directory.
258 258
259 259 osstat: (mode, size, time) tuple, as returned by os.lstat(wfn)
260 260
261 261 newstatus: char in statuskeys, new status to apply.
262 262 '''
263 263 if osstat:
264 264 newstatus = self.filestatus(wfn, osstat)
265 265 else:
266 266 self.statcache.pop(wfn, None)
267 267 root, fn = self.split(wfn)
268 268 d = self.dir(self.tree, root)
269 269 oldstatus = d.get(fn)
270 270 isdir = False
271 271 if oldstatus:
272 272 try:
273 273 if not newstatus:
274 274 if oldstatus in 'almn':
275 275 newstatus = '!'
276 276 elif oldstatus == 'r':
277 277 newstatus = 'r'
278 278 except TypeError:
279 279 # oldstatus may be a dict left behind by a deleted
280 280 # directory
281 281 isdir = True
282 282 else:
283 283 if oldstatus in self.statuskeys and oldstatus != newstatus:
284 284 del self.dir(self.statustrees[oldstatus], root)[fn]
285 285 if self.ui.debugflag and oldstatus != newstatus:
286 286 if isdir:
287 287 self.ui.note(_('status: %r dir(%d) -> %s\n') %
288 288 (wfn, len(oldstatus), newstatus))
289 289 else:
290 290 self.ui.note(_('status: %r %s -> %s\n') %
291 291 (wfn, oldstatus, newstatus))
292 292 if not isdir:
293 293 if newstatus and newstatus != 'i':
294 294 d[fn] = newstatus
295 295 if newstatus in self.statuskeys:
296 296 dd = self.dir(self.statustrees[newstatus], root)
297 297 if oldstatus != newstatus or fn not in dd:
298 298 dd[fn] = newstatus
299 299 else:
300 300 d.pop(fn, None)
301 301 elif not newstatus:
302 302 # a directory is being removed, check its contents
303 303 for subfile, b in oldstatus.copy().iteritems():
304 304 self.updatestatus(wfn + '/' + subfile, None)
305 305
306 306
307 307 def check_deleted(self, key):
308 308 # Files that had been deleted but were present in the dirstate
309 309 # may have vanished from the dirstate; we must clean them up.
310 310 nuke = []
311 311 for wfn, ignore in self.walk(key, self.statustrees[key]):
312 312 if wfn not in self.repo.dirstate:
313 313 nuke.append(wfn)
314 314 for wfn in nuke:
315 315 root, fn = self.split(wfn)
316 316 del self.dir(self.statustrees[key], root)[fn]
317 317 del self.dir(self.tree, root)[fn]
318 318
319 319 def scan(self, topdir=''):
320 320 self.handle_timeout()
321 321 ds = self.repo.dirstate._map.copy()
322 322 self.add_watch(join(self.repo.root, topdir), self.mask)
323 323 for root, dirs, files in walk(self.repo, topdir):
324 324 for d in dirs:
325 325 self.add_watch(join(root, d), self.mask)
326 326 wroot = root[len(self.wprefix):]
327 327 d = self.dir(self.tree, wroot)
328 328 for fn in files:
329 329 wfn = join(wroot, fn)
330 330 self.updatestatus(wfn, self.getstat(wfn))
331 331 ds.pop(wfn, None)
332 332 wtopdir = topdir
333 333 if wtopdir and wtopdir[-1] != '/':
334 334 wtopdir += '/'
335 335 for wfn, state in ds.iteritems():
336 336 if not wfn.startswith(wtopdir):
337 337 continue
338 338 try:
339 339 st = self.stat(wfn)
340 340 except OSError:
341 341 status = state[0]
342 342 self.updatestatus(wfn, None, newstatus=status)
343 343 else:
344 344 self.updatestatus(wfn, st)
345 345 self.check_deleted('!')
346 346 self.check_deleted('r')
347 347
348 348 def check_dirstate(self):
349 349 ds_info = self.dirstate_info()
350 350 if ds_info == self.ds_info:
351 351 return
352 352 self.ds_info = ds_info
353 353 if not self.ui.debugflag:
354 354 self.last_event = None
355 355 self.ui.note(_('%s dirstate reload\n') % self.event_time())
356 356 self.repo.dirstate.invalidate()
357 357 self.scan()
358 358 self.ui.note(_('%s end dirstate reload\n') % self.event_time())
359 359
360 360 def walk(self, states, tree, prefix=''):
361 361 # This is the "inner loop" when talking to the client.
362 362
363 363 for name, val in tree.iteritems():
364 364 path = join(prefix, name)
365 365 try:
366 366 if val in states:
367 367 yield path, val
368 368 except TypeError:
369 369 for p in self.walk(states, val, path):
370 370 yield p
371 371
372 372 def update_hgignore(self):
373 373 # An update of the ignore file can potentially change the
374 374 # states of all unknown and ignored files.
375 375
376 376 # XXX If the user has other ignore files outside the repo, or
377 377 # changes their list of ignore files at run time, we'll
378 378 # potentially never see changes to them. We could get the
379 379 # client to report to us what ignore data they're using.
380 380 # But it's easier to do nothing than to open that can of
381 381 # worms.
382 382
383 383 if '_ignore' in self.repo.dirstate.__dict__:
384 384 delattr(self.repo.dirstate, '_ignore')
385 385 self.ui.note(_('rescanning due to .hgignore change\n'))
386 386 self.scan()
387 387
388 388 def getstat(self, wpath):
389 389 try:
390 390 return self.statcache[wpath]
391 391 except KeyError:
392 392 try:
393 393 return self.stat(wpath)
394 394 except OSError, err:
395 395 if err.errno != errno.ENOENT:
396 396 raise
397 397
398 398 def stat(self, wpath):
399 399 try:
400 400 st = os.lstat(join(self.wprefix, wpath))
401 401 ret = st.st_mode, st.st_size, st.st_mtime
402 402 self.statcache[wpath] = ret
403 403 return ret
404 404 except OSError:
405 405 self.statcache.pop(wpath, None)
406 406 raise
407 407
408 408 def created(self, wpath):
409 409 if wpath == '.hgignore':
410 410 self.update_hgignore()
411 411 try:
412 412 st = self.stat(wpath)
413 413 if stat.S_ISREG(st[0]):
414 414 self.updatestatus(wpath, st)
415 415 except OSError:
416 416 pass
417 417
418 418 def modified(self, wpath):
419 419 if wpath == '.hgignore':
420 420 self.update_hgignore()
421 421 try:
422 422 st = self.stat(wpath)
423 423 if stat.S_ISREG(st[0]):
424 424 if self.repo.dirstate[wpath] in 'lmn':
425 425 self.updatestatus(wpath, st)
426 426 except OSError:
427 427 pass
428 428
429 429 def deleted(self, wpath):
430 430 if wpath == '.hgignore':
431 431 self.update_hgignore()
432 432 elif wpath.startswith('.hg/'):
433 433 if wpath == '.hg/wlock':
434 434 self.check_dirstate()
435 435 return
436 436
437 437 self.updatestatus(wpath, None)
438 438
439 439 def schedule_work(self, wpath, evt):
440 440 prev = self.eventq.setdefault(wpath, [])
441 441 try:
442 442 if prev and evt == 'm' and prev[-1] in 'cm':
443 443 return
444 444 self.eventq[wpath].append(evt)
445 445 finally:
446 446 self.deferred += 1
447 447 self.timeout = 250
448 448
449 449 def deferred_event(self, wpath, evt):
450 450 if evt == 'c':
451 451 self.created(wpath)
452 452 elif evt == 'm':
453 453 self.modified(wpath)
454 454 elif evt == 'd':
455 455 self.deleted(wpath)
456 456
457 457 def process_create(self, wpath, evt):
458 458 if self.ui.debugflag:
459 459 self.ui.note(_('%s event: created %s\n') %
460 460 (self.event_time(), wpath))
461 461
462 462 if evt.mask & inotify.IN_ISDIR:
463 463 self.scan(wpath)
464 464 else:
465 465 self.schedule_work(wpath, 'c')
466 466
467 467 def process_delete(self, wpath, evt):
468 468 if self.ui.debugflag:
469 469 self.ui.note(_('%s event: deleted %s\n') %
470 470 (self.event_time(), wpath))
471 471
472 472 if evt.mask & inotify.IN_ISDIR:
473 473 self.scan(wpath)
474 474 self.schedule_work(wpath, 'd')
475 475
476 476 def process_modify(self, wpath, evt):
477 477 if self.ui.debugflag:
478 478 self.ui.note(_('%s event: modified %s\n') %
479 479 (self.event_time(), wpath))
480 480
481 481 if not (evt.mask & inotify.IN_ISDIR):
482 482 self.schedule_work(wpath, 'm')
483 483
484 484 def process_unmount(self, evt):
485 485 self.ui.warn(_('filesystem containing %s was unmounted\n') %
486 486 evt.fullpath)
487 487 sys.exit(0)
488 488
489 489 def handle_event(self, fd, event):
490 490 if self.ui.debugflag:
491 491 self.ui.note(_('%s readable: %d bytes\n') %
492 492 (self.event_time(), self.threshold.readable()))
493 493 if not self.threshold():
494 494 if self.registered:
495 495 if self.ui.debugflag:
496 496 self.ui.note(_('%s below threshold - unhooking\n') %
497 497 (self.event_time()))
498 498 self.master.poll.unregister(fd)
499 499 self.registered = False
500 500 self.timeout = 250
501 501 else:
502 502 self.read_events()
503 503
504 504 def read_events(self, bufsize=None):
505 505 events = self.watcher.read(bufsize)
506 506 if self.ui.debugflag:
507 507 self.ui.note(_('%s reading %d events\n') %
508 508 (self.event_time(), len(events)))
509 509 for evt in events:
510 510 wpath = self.wpath(evt)
511 511 if evt.mask & inotify.IN_UNMOUNT:
512 512 self.process_unmount(wpath, evt)
513 513 elif evt.mask & (inotify.IN_MODIFY | inotify.IN_ATTRIB):
514 514 self.process_modify(wpath, evt)
515 515 elif evt.mask & (inotify.IN_DELETE | inotify.IN_DELETE_SELF |
516 516 inotify.IN_MOVED_FROM):
517 517 self.process_delete(wpath, evt)
518 518 elif evt.mask & (inotify.IN_CREATE | inotify.IN_MOVED_TO):
519 519 self.process_create(wpath, evt)
520 520
521 521 def handle_timeout(self):
522 522 if not self.registered:
523 523 if self.ui.debugflag:
524 524 self.ui.note(_('%s hooking back up with %d bytes readable\n') %
525 525 (self.event_time(), self.threshold.readable()))
526 526 self.read_events(0)
527 527 self.master.poll.register(self, select.POLLIN)
528 528 self.registered = True
529 529
530 530 if self.eventq:
531 531 if self.ui.debugflag:
532 532 self.ui.note(_('%s processing %d deferred events as %d\n') %
533 533 (self.event_time(), self.deferred,
534 534 len(self.eventq)))
535 535 for wpath, evts in sorted(self.eventq.iteritems()):
536 536 for evt in evts:
537 537 self.deferred_event(wpath, evt)
538 538 self.eventq.clear()
539 539 self.deferred = 0
540 540 self.timeout = None
541 541
542 542 def shutdown(self):
543 543 self.watcher.close()
544 544
545 def debug(self):
546 """
547 Returns a sorted list of relatives paths currently watched,
548 for debugging purposes.
549 """
550 return sorted(tuple[0][len(self.wprefix):] for tuple in self.watcher)
551
545 552 class server(object):
546 553 poll_events = select.POLLIN
547 554
548 555 def __init__(self, ui, repo, repowatcher, timeout):
549 556 self.ui = ui
550 557 self.repo = repo
551 558 self.repowatcher = repowatcher
552 559 self.timeout = timeout
553 560 self.sock = socket.socket(socket.AF_UNIX)
554 561 self.sockpath = self.repo.join('inotify.sock')
555 562 self.realsockpath = None
556 563 try:
557 564 self.sock.bind(self.sockpath)
558 565 except socket.error, err:
559 566 if err[0] == errno.EADDRINUSE:
560 567 raise AlreadyStartedException(_('could not start server: %s')
561 568 % err[1])
562 569 if err[0] == "AF_UNIX path too long":
563 570 tempdir = tempfile.mkdtemp(prefix="hg-inotify-")
564 571 self.realsockpath = os.path.join(tempdir, "inotify.sock")
565 572 try:
566 573 self.sock.bind(self.realsockpath)
567 574 os.symlink(self.realsockpath, self.sockpath)
568 575 except (OSError, socket.error), inst:
569 576 try:
570 577 os.unlink(self.realsockpath)
571 578 except:
572 579 pass
573 580 os.rmdir(tempdir)
574 581 if inst.errno == errno.EEXIST:
575 582 raise AlreadyStartedException(_('could not start server: %s')
576 583 % inst.strerror)
577 584 raise
578 585 else:
579 586 raise
580 587 self.sock.listen(5)
581 588 self.fileno = self.sock.fileno
582 589
583 590 def handle_timeout(self):
584 591 pass
585 592
586 593 def answer_stat_query(self, cs):
587 594 names = cs.read().split('\0')
588 595
589 596 states = names.pop()
590 597
591 598 self.ui.note(_('answering query for %r\n') % states)
592 599
593 600 if self.repowatcher.timeout:
594 601 # We got a query while a rescan is pending. Make sure we
595 602 # rescan before responding, or we could give back a wrong
596 603 # answer.
597 604 self.repowatcher.handle_timeout()
598 605
599 606 if not names:
600 607 def genresult(states, tree):
601 608 for fn, state in self.repowatcher.walk(states, tree):
602 609 yield fn
603 610 else:
604 611 def genresult(states, tree):
605 612 for fn in names:
606 613 l = self.repowatcher.lookup(fn, tree)
607 614 try:
608 615 if l in states:
609 616 yield fn
610 617 except TypeError:
611 618 for f, s in self.repowatcher.walk(states, l, fn):
612 619 yield f
613 620
614 621 return ['\0'.join(r) for r in [
615 622 genresult('l', self.repowatcher.statustrees['l']),
616 623 genresult('m', self.repowatcher.statustrees['m']),
617 624 genresult('a', self.repowatcher.statustrees['a']),
618 625 genresult('r', self.repowatcher.statustrees['r']),
619 626 genresult('!', self.repowatcher.statustrees['!']),
620 627 '?' in states
621 628 and genresult('?', self.repowatcher.statustrees['?'])
622 629 or [],
623 630 [],
624 631 'c' in states and genresult('n', self.repowatcher.tree) or [],
625 632 ]]
626 633
634 def answer_dbug_query(self):
635 return ['\0'.join(self.repowatcher.debug())]
636
627 637 def handle_event(self, fd, event):
628 638 sock, addr = self.sock.accept()
629 639
630 640 cs = common.recvcs(sock)
631 641 version = ord(cs.read(1))
632 642
633 643 if version != common.version:
634 644 self.ui.warn(_('received query from incompatible client '
635 645 'version %d\n') % version)
636 646 return
637 647
638 648 type = cs.read(4)
639 649
640 650 if type == 'STAT':
641 651 results = self.answer_stat_query(cs)
652 elif type == 'DBUG':
653 results = self.answer_dbug_query()
642 654 else:
643 655 self.ui.warn(_('unrecognized query type: %s\n') % type)
644 656 return
645 657
646 658 try:
647 659 try:
648 660 v = chr(common.version)
649 661
650 662 sock.sendall(v + type + struct.pack(common.resphdrfmts[type],
651 663 *map(len, results)))
652 664 sock.sendall(''.join(results))
653 665 finally:
654 666 sock.shutdown(socket.SHUT_WR)
655 667 except socket.error, err:
656 668 if err[0] != errno.EPIPE:
657 669 raise
658 670
659 671 def shutdown(self):
660 672 self.sock.close()
661 673 try:
662 674 os.unlink(self.sockpath)
663 675 if self.realsockpath:
664 676 os.unlink(self.realsockpath)
665 677 os.rmdir(os.path.dirname(self.realsockpath))
666 678 except OSError, err:
667 679 if err.errno != errno.ENOENT:
668 680 raise
669 681
670 682 class master(object):
671 683 def __init__(self, ui, repo, timeout=None):
672 684 self.ui = ui
673 685 self.repo = repo
674 686 self.poll = select.poll()
675 687 self.repowatcher = repowatcher(ui, repo, self)
676 688 self.server = server(ui, repo, self.repowatcher, timeout)
677 689 self.table = {}
678 690 for obj in (self.repowatcher, self.server):
679 691 fd = obj.fileno()
680 692 self.table[fd] = obj
681 693 self.poll.register(fd, obj.poll_events)
682 694
683 695 def register(self, fd, mask):
684 696 self.poll.register(fd, mask)
685 697
686 698 def shutdown(self):
687 699 for obj in self.table.itervalues():
688 700 obj.shutdown()
689 701
690 702 def run(self):
691 703 self.repowatcher.setup()
692 704 self.ui.note(_('finished setup\n'))
693 705 if os.getenv('TIME_STARTUP'):
694 706 sys.exit(0)
695 707 while True:
696 708 timeout = None
697 709 timeobj = None
698 710 for obj in self.table.itervalues():
699 711 if obj.timeout is not None and (timeout is None or obj.timeout < timeout):
700 712 timeout, timeobj = obj.timeout, obj
701 713 try:
702 714 if self.ui.debugflag:
703 715 if timeout is None:
704 716 self.ui.note(_('polling: no timeout\n'))
705 717 else:
706 718 self.ui.note(_('polling: %sms timeout\n') % timeout)
707 719 events = self.poll.poll(timeout)
708 720 except select.error, err:
709 721 if err[0] == errno.EINTR:
710 722 continue
711 723 raise
712 724 if events:
713 725 for fd, event in events:
714 726 self.table[fd].handle_event(fd, event)
715 727 elif timeobj:
716 728 timeobj.handle_timeout()
717 729
718 730 def start(ui, repo):
719 731 def closefds(ignore):
720 732 # (from python bug #1177468)
721 733 # close all inherited file descriptors
722 734 # Python 2.4.1 and later use /dev/urandom to seed the random module's RNG
723 735 # a file descriptor is kept internally as os._urandomfd (created on demand
724 736 # the first time os.urandom() is called), and should not be closed
725 737 try:
726 738 os.urandom(4)
727 739 urandom_fd = getattr(os, '_urandomfd', None)
728 740 except AttributeError:
729 741 urandom_fd = None
730 742 ignore.append(urandom_fd)
731 743 for fd in range(3, 256):
732 744 if fd in ignore:
733 745 continue
734 746 try:
735 747 os.close(fd)
736 748 except OSError:
737 749 pass
738 750
739 751 m = master(ui, repo)
740 752 sys.stdout.flush()
741 753 sys.stderr.flush()
742 754
743 755 pid = os.fork()
744 756 if pid:
745 757 return pid
746 758
747 759 closefds([m.server.fileno(), m.repowatcher.fileno()])
748 760 os.setsid()
749 761
750 762 fd = os.open('/dev/null', os.O_RDONLY)
751 763 os.dup2(fd, 0)
752 764 if fd > 0:
753 765 os.close(fd)
754 766
755 767 fd = os.open(ui.config('inotify', 'log', '/dev/null'),
756 768 os.O_RDWR | os.O_CREAT | os.O_TRUNC)
757 769 os.dup2(fd, 1)
758 770 os.dup2(fd, 2)
759 771 if fd > 2:
760 772 os.close(fd)
761 773
762 774 try:
763 775 m.run()
764 776 finally:
765 777 m.shutdown()
766 778 os._exit(0)
General Comments 0
You need to be logged in to leave comments. Login now