##// END OF EJS Templates
inotify: follow new files if they are regular or a symlink....
Nicolas Dumazet -
r10089:8fab3172 default
parent child Browse files
Show More
@@ -1,429 +1,429 b''
1 1 # linuxserver.py - inotify status server for linux
2 2 #
3 3 # Copyright 2006, 2007, 2008 Bryan O'Sullivan <bos@serpentine.com>
4 4 # Copyright 2007, 2008 Brendan Cully <brendan@kublai.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2, incorporated herein by reference.
8 8
9 9 from mercurial.i18n import _
10 10 from mercurial import osutil, util
11 11 import common
12 12 import server
13 13 import errno, os, select, stat, sys, time
14 14
15 15 try:
16 16 import linux as inotify
17 17 from linux import watcher
18 18 except ImportError:
19 19 raise
20 20
21 21 def walkrepodirs(dirstate, absroot):
22 22 '''Iterate over all subdirectories of this repo.
23 23 Exclude the .hg directory, any nested repos, and ignored dirs.'''
24 24 def walkit(dirname, top):
25 25 fullpath = server.join(absroot, dirname)
26 26 try:
27 27 for name, kind in osutil.listdir(fullpath):
28 28 if kind == stat.S_IFDIR:
29 29 if name == '.hg':
30 30 if not top:
31 31 return
32 32 else:
33 33 d = server.join(dirname, name)
34 34 if dirstate._ignore(d):
35 35 continue
36 36 for subdir in walkit(d, False):
37 37 yield subdir
38 38 except OSError, err:
39 39 if err.errno not in server.walk_ignored_errors:
40 40 raise
41 41 yield fullpath
42 42
43 43 return walkit('', True)
44 44
45 45 def _explain_watch_limit(ui, dirstate, rootabs):
46 46 path = '/proc/sys/fs/inotify/max_user_watches'
47 47 try:
48 48 limit = int(file(path).read())
49 49 except IOError, err:
50 50 if err.errno != errno.ENOENT:
51 51 raise
52 52 raise util.Abort(_('this system does not seem to '
53 53 'support inotify'))
54 54 ui.warn(_('*** the current per-user limit on the number '
55 55 'of inotify watches is %s\n') % limit)
56 56 ui.warn(_('*** this limit is too low to watch every '
57 57 'directory in this repository\n'))
58 58 ui.warn(_('*** counting directories: '))
59 59 ndirs = len(list(walkrepodirs(dirstate, rootabs)))
60 60 ui.warn(_('found %d\n') % ndirs)
61 61 newlimit = min(limit, 1024)
62 62 while newlimit < ((limit + ndirs) * 1.1):
63 63 newlimit *= 2
64 64 ui.warn(_('*** to raise the limit from %d to %d (run as root):\n') %
65 65 (limit, newlimit))
66 66 ui.warn(_('*** echo %d > %s\n') % (newlimit, path))
67 67 raise util.Abort(_('cannot watch %s until inotify watch limit is raised')
68 68 % rootabs)
69 69
70 70 class pollable(object):
71 71 """
72 72 Interface to support polling.
73 73 The file descriptor returned by fileno() is registered to a polling
74 74 object.
75 75 Usage:
76 76 Every tick, check if an event has happened since the last tick:
77 77 * If yes, call handle_events
78 78 * If no, call handle_timeout
79 79 """
80 80 poll_events = select.POLLIN
81 81 instances = {}
82 82 poll = select.poll()
83 83
84 84 def fileno(self):
85 85 raise NotImplementedError
86 86
87 87 def handle_events(self, events):
88 88 raise NotImplementedError
89 89
90 90 def handle_timeout(self):
91 91 raise NotImplementedError
92 92
93 93 def shutdown(self):
94 94 raise NotImplementedError
95 95
96 96 def register(self, timeout):
97 97 fd = self.fileno()
98 98
99 99 pollable.poll.register(fd, pollable.poll_events)
100 100 pollable.instances[fd] = self
101 101
102 102 self.registered = True
103 103 self.timeout = timeout
104 104
105 105 def unregister(self):
106 106 pollable.poll.unregister(self)
107 107 self.registered = False
108 108
109 109 @classmethod
110 110 def run(cls):
111 111 while True:
112 112 timeout = None
113 113 timeobj = None
114 114 for obj in cls.instances.itervalues():
115 115 if obj.timeout is not None and (timeout is None or obj.timeout < timeout):
116 116 timeout, timeobj = obj.timeout, obj
117 117 try:
118 118 events = cls.poll.poll(timeout)
119 119 except select.error, err:
120 120 if err[0] == errno.EINTR:
121 121 continue
122 122 raise
123 123 if events:
124 124 by_fd = {}
125 125 for fd, event in events:
126 126 by_fd.setdefault(fd, []).append(event)
127 127
128 128 for fd, events in by_fd.iteritems():
129 129 cls.instances[fd].handle_pollevents(events)
130 130
131 131 elif timeobj:
132 132 timeobj.handle_timeout()
133 133
134 134 def eventaction(code):
135 135 """
136 136 Decorator to help handle events in repowatcher
137 137 """
138 138 def decorator(f):
139 139 def wrapper(self, wpath):
140 140 if code == 'm' and wpath in self.lastevent and \
141 141 self.lastevent[wpath] in 'cm':
142 142 return
143 143 self.lastevent[wpath] = code
144 144 self.timeout = 250
145 145
146 146 f(self, wpath)
147 147
148 148 wrapper.func_name = f.func_name
149 149 return wrapper
150 150 return decorator
151 151
152 152 class repowatcher(server.repowatcher, pollable):
153 153 """
154 154 Watches inotify events
155 155 """
156 156 mask = (
157 157 inotify.IN_ATTRIB |
158 158 inotify.IN_CREATE |
159 159 inotify.IN_DELETE |
160 160 inotify.IN_DELETE_SELF |
161 161 inotify.IN_MODIFY |
162 162 inotify.IN_MOVED_FROM |
163 163 inotify.IN_MOVED_TO |
164 164 inotify.IN_MOVE_SELF |
165 165 inotify.IN_ONLYDIR |
166 166 inotify.IN_UNMOUNT |
167 167 0)
168 168
169 169 def __init__(self, ui, dirstate, root):
170 170 server.repowatcher.__init__(self, ui, dirstate, root)
171 171
172 172 self.lastevent = {}
173 173 try:
174 174 self.watcher = watcher.watcher()
175 175 except OSError, err:
176 176 raise util.Abort(_('inotify service not available: %s') %
177 177 err.strerror)
178 178 self.threshold = watcher.threshold(self.watcher)
179 179 self.fileno = self.watcher.fileno
180 180 self.register(timeout=None)
181 181
182 182 self.handle_timeout()
183 183 self.scan()
184 184
185 185 def event_time(self):
186 186 last = self.last_event
187 187 now = time.time()
188 188 self.last_event = now
189 189
190 190 if last is None:
191 191 return 'start'
192 192 delta = now - last
193 193 if delta < 5:
194 194 return '+%.3f' % delta
195 195 if delta < 50:
196 196 return '+%.2f' % delta
197 197 return '+%.1f' % delta
198 198
199 199 def add_watch(self, path, mask):
200 200 if not path:
201 201 return
202 202 if self.watcher.path(path) is None:
203 203 if self.ui.debugflag:
204 204 self.ui.note(_('watching %r\n') % path[self.prefixlen:])
205 205 try:
206 206 self.watcher.add(path, mask)
207 207 except OSError, err:
208 208 if err.errno in (errno.ENOENT, errno.ENOTDIR):
209 209 return
210 210 if err.errno != errno.ENOSPC:
211 211 raise
212 212 _explain_watch_limit(self.ui, self.dirstate, self.wprefix)
213 213
214 214 def setup(self):
215 215 self.ui.note(_('watching directories under %r\n') % self.wprefix)
216 216 self.add_watch(self.wprefix + '.hg', inotify.IN_DELETE)
217 217 self.check_dirstate()
218 218
219 219 def scan(self, topdir=''):
220 220 ds = self.dirstate._map.copy()
221 221 self.add_watch(server.join(self.wprefix, topdir), self.mask)
222 222 for root, dirs, files in server.walk(self.dirstate, self.wprefix,
223 223 topdir):
224 224 for d in dirs:
225 225 self.add_watch(server.join(root, d), self.mask)
226 226 wroot = root[self.prefixlen:]
227 227 for fn in files:
228 228 wfn = server.join(wroot, fn)
229 229 self.updatefile(wfn, self.getstat(wfn))
230 230 ds.pop(wfn, None)
231 231 wtopdir = topdir
232 232 if wtopdir and wtopdir[-1] != '/':
233 233 wtopdir += '/'
234 234 for wfn, state in ds.iteritems():
235 235 if not wfn.startswith(wtopdir):
236 236 continue
237 237 try:
238 238 st = self.stat(wfn)
239 239 except OSError:
240 240 status = state[0]
241 241 self.deletefile(wfn, status)
242 242 else:
243 243 self.updatefile(wfn, st)
244 244 self.check_deleted('!')
245 245 self.check_deleted('r')
246 246
247 247 @eventaction('c')
248 248 def created(self, wpath):
249 249 if wpath == '.hgignore':
250 250 self.update_hgignore()
251 251 try:
252 252 st = self.stat(wpath)
253 if stat.S_ISREG(st[0]):
253 if stat.S_ISREG(st[0]) or stat.S_ISLNK(st[0]):
254 254 self.updatefile(wpath, st)
255 255 except OSError:
256 256 pass
257 257
258 258 @eventaction('m')
259 259 def modified(self, wpath):
260 260 if wpath == '.hgignore':
261 261 self.update_hgignore()
262 262 try:
263 263 st = self.stat(wpath)
264 264 if stat.S_ISREG(st[0]):
265 265 if self.dirstate[wpath] in 'lmn':
266 266 self.updatefile(wpath, st)
267 267 except OSError:
268 268 pass
269 269
270 270 @eventaction('d')
271 271 def deleted(self, wpath):
272 272 if wpath == '.hgignore':
273 273 self.update_hgignore()
274 274 elif wpath.startswith('.hg/'):
275 275 if wpath == '.hg/wlock':
276 276 self.check_dirstate()
277 277 return
278 278
279 279 self.deletefile(wpath, self.dirstate[wpath])
280 280
281 281 def process_create(self, wpath, evt):
282 282 if self.ui.debugflag:
283 283 self.ui.note(_('%s event: created %s\n') %
284 284 (self.event_time(), wpath))
285 285
286 286 if evt.mask & inotify.IN_ISDIR:
287 287 self.scan(wpath)
288 288 else:
289 289 self.created(wpath)
290 290
291 291 def process_delete(self, wpath, evt):
292 292 if self.ui.debugflag:
293 293 self.ui.note(_('%s event: deleted %s\n') %
294 294 (self.event_time(), wpath))
295 295
296 296 if evt.mask & inotify.IN_ISDIR:
297 297 tree = self.tree.dir(wpath)
298 298 todelete = [wfn for wfn, ignore in tree.walk('?')]
299 299 for fn in todelete:
300 300 self.deletefile(fn, '?')
301 301 self.scan(wpath)
302 302 else:
303 303 self.deleted(wpath)
304 304
305 305 def process_modify(self, wpath, evt):
306 306 if self.ui.debugflag:
307 307 self.ui.note(_('%s event: modified %s\n') %
308 308 (self.event_time(), wpath))
309 309
310 310 if not (evt.mask & inotify.IN_ISDIR):
311 311 self.modified(wpath)
312 312
313 313 def process_unmount(self, evt):
314 314 self.ui.warn(_('filesystem containing %s was unmounted\n') %
315 315 evt.fullpath)
316 316 sys.exit(0)
317 317
318 318 def handle_pollevents(self, events):
319 319 if self.ui.debugflag:
320 320 self.ui.note(_('%s readable: %d bytes\n') %
321 321 (self.event_time(), self.threshold.readable()))
322 322 if not self.threshold():
323 323 if self.registered:
324 324 if self.ui.debugflag:
325 325 self.ui.note(_('%s below threshold - unhooking\n') %
326 326 (self.event_time()))
327 327 self.unregister()
328 328 self.timeout = 250
329 329 else:
330 330 self.read_events()
331 331
332 332 def read_events(self, bufsize=None):
333 333 events = self.watcher.read(bufsize)
334 334 if self.ui.debugflag:
335 335 self.ui.note(_('%s reading %d events\n') %
336 336 (self.event_time(), len(events)))
337 337 for evt in events:
338 338 assert evt.fullpath.startswith(self.wprefix)
339 339 wpath = evt.fullpath[self.prefixlen:]
340 340
341 341 # paths have been normalized, wpath never ends with a '/'
342 342
343 343 if wpath.startswith('.hg/') and evt.mask & inotify.IN_ISDIR:
344 344 # ignore subdirectories of .hg/ (merge, patches...)
345 345 continue
346 346
347 347 if evt.mask & inotify.IN_UNMOUNT:
348 348 self.process_unmount(wpath, evt)
349 349 elif evt.mask & (inotify.IN_MODIFY | inotify.IN_ATTRIB):
350 350 self.process_modify(wpath, evt)
351 351 elif evt.mask & (inotify.IN_DELETE | inotify.IN_DELETE_SELF |
352 352 inotify.IN_MOVED_FROM):
353 353 self.process_delete(wpath, evt)
354 354 elif evt.mask & (inotify.IN_CREATE | inotify.IN_MOVED_TO):
355 355 self.process_create(wpath, evt)
356 356
357 357 self.lastevent.clear()
358 358
359 359 def handle_timeout(self):
360 360 if not self.registered:
361 361 if self.ui.debugflag:
362 362 self.ui.note(_('%s hooking back up with %d bytes readable\n') %
363 363 (self.event_time(), self.threshold.readable()))
364 364 self.read_events(0)
365 365 self.register(timeout=None)
366 366
367 367 self.timeout = None
368 368
369 369 def shutdown(self):
370 370 self.watcher.close()
371 371
372 372 def debug(self):
373 373 """
374 374 Returns a sorted list of relatives paths currently watched,
375 375 for debugging purposes.
376 376 """
377 377 return sorted(tuple[0][self.prefixlen:] for tuple in self.watcher)
378 378
379 379 class socketlistener(server.socketlistener, pollable):
380 380 """
381 381 Listens for client queries on unix socket inotify.sock
382 382 """
383 383 def __init__(self, ui, root, repowatcher, timeout):
384 384 server.socketlistener.__init__(self, ui, root, repowatcher, timeout)
385 385 self.register(timeout=timeout)
386 386
387 387 def handle_timeout(self):
388 388 pass
389 389
390 390 def handle_pollevents(self, events):
391 391 for e in events:
392 392 self.accept_connection()
393 393
394 394 def shutdown(self):
395 395 self.sock.close()
396 396 try:
397 397 os.unlink(self.sockpath)
398 398 if self.realsockpath:
399 399 os.unlink(self.realsockpath)
400 400 os.rmdir(os.path.dirname(self.realsockpath))
401 401 except OSError, err:
402 402 if err.errno != errno.ENOENT:
403 403 raise
404 404
405 405 def answer_stat_query(self, cs):
406 406 if self.repowatcher.timeout:
407 407 # We got a query while a rescan is pending. Make sure we
408 408 # rescan before responding, or we could give back a wrong
409 409 # answer.
410 410 self.repowatcher.handle_timeout()
411 411 return server.socketlistener.answer_stat_query(self, cs)
412 412
413 413 class master(object):
414 414 def __init__(self, ui, dirstate, root, timeout=None):
415 415 self.ui = ui
416 416 self.repowatcher = repowatcher(ui, dirstate, root)
417 417 self.socketlistener = socketlistener(ui, root, self.repowatcher,
418 418 timeout)
419 419
420 420 def shutdown(self):
421 421 for obj in pollable.instances.itervalues():
422 422 obj.shutdown()
423 423
424 424 def run(self):
425 425 self.repowatcher.setup()
426 426 self.ui.note(_('finished setup\n'))
427 427 if os.getenv('TIME_STARTUP'):
428 428 sys.exit(0)
429 429 pollable.run()
General Comments 0
You need to be logged in to leave comments. Login now