##// END OF EJS Templates
inotify: catch SignalInterrupt during shutdown (issue3351)...
Thomas Arendsen Hein -
r16354:9f98fe05 stable
parent child Browse files
Show More
@@ -1,441 +1,444 b''
1 1 # linuxserver.py - inotify status server for linux
2 2 #
3 3 # Copyright 2006, 2007, 2008 Bryan O'Sullivan <bos@serpentine.com>
4 4 # Copyright 2007, 2008 Brendan Cully <brendan@kublai.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from mercurial.i18n import _
10 from mercurial import osutil, util
10 from mercurial import osutil, util, error
11 11 import server
12 12 import errno, os, select, stat, sys, time
13 13
14 14 try:
15 15 import linux as inotify
16 16 from linux import watcher
17 17 except ImportError:
18 18 raise
19 19
20 20 def walkrepodirs(dirstate, absroot):
21 21 '''Iterate over all subdirectories of this repo.
22 22 Exclude the .hg directory, any nested repos, and ignored dirs.'''
23 23 def walkit(dirname, top):
24 24 fullpath = server.join(absroot, dirname)
25 25 try:
26 26 for name, kind in osutil.listdir(fullpath):
27 27 if kind == stat.S_IFDIR:
28 28 if name == '.hg':
29 29 if not top:
30 30 return
31 31 else:
32 32 d = server.join(dirname, name)
33 33 if dirstate._ignore(d):
34 34 continue
35 35 for subdir in walkit(d, False):
36 36 yield subdir
37 37 except OSError, err:
38 38 if err.errno not in server.walk_ignored_errors:
39 39 raise
40 40 yield fullpath
41 41
42 42 return walkit('', True)
43 43
44 44 def _explain_watch_limit(ui, dirstate, rootabs):
45 45 path = '/proc/sys/fs/inotify/max_user_watches'
46 46 try:
47 47 limit = int(util.readfile(path))
48 48 except IOError, err:
49 49 if err.errno != errno.ENOENT:
50 50 raise
51 51 raise util.Abort(_('this system does not seem to '
52 52 'support inotify'))
53 53 ui.warn(_('*** the current per-user limit on the number '
54 54 'of inotify watches is %s\n') % limit)
55 55 ui.warn(_('*** this limit is too low to watch every '
56 56 'directory in this repository\n'))
57 57 ui.warn(_('*** counting directories: '))
58 58 ndirs = len(list(walkrepodirs(dirstate, rootabs)))
59 59 ui.warn(_('found %d\n') % ndirs)
60 60 newlimit = min(limit, 1024)
61 61 while newlimit < ((limit + ndirs) * 1.1):
62 62 newlimit *= 2
63 63 ui.warn(_('*** to raise the limit from %d to %d (run as root):\n') %
64 64 (limit, newlimit))
65 65 ui.warn(_('*** echo %d > %s\n') % (newlimit, path))
66 66 raise util.Abort(_('cannot watch %s until inotify watch limit is raised')
67 67 % rootabs)
68 68
69 69 class pollable(object):
70 70 """
71 71 Interface to support polling.
72 72 The file descriptor returned by fileno() is registered to a polling
73 73 object.
74 74 Usage:
75 75 Every tick, check if an event has happened since the last tick:
76 76 * If yes, call handle_events
77 77 * If no, call handle_timeout
78 78 """
79 79 poll_events = select.POLLIN
80 80 instances = {}
81 81 poll = select.poll()
82 82
83 83 def fileno(self):
84 84 raise NotImplementedError
85 85
86 86 def handle_events(self, events):
87 87 raise NotImplementedError
88 88
89 89 def handle_timeout(self):
90 90 raise NotImplementedError
91 91
92 92 def shutdown(self):
93 93 raise NotImplementedError
94 94
95 95 def register(self, timeout):
96 96 fd = self.fileno()
97 97
98 98 pollable.poll.register(fd, pollable.poll_events)
99 99 pollable.instances[fd] = self
100 100
101 101 self.registered = True
102 102 self.timeout = timeout
103 103
104 104 def unregister(self):
105 105 pollable.poll.unregister(self)
106 106 self.registered = False
107 107
108 108 @classmethod
109 109 def run(cls):
110 110 while True:
111 111 timeout = None
112 112 timeobj = None
113 113 for obj in cls.instances.itervalues():
114 114 if obj.timeout is not None and (timeout is None
115 115 or obj.timeout < timeout):
116 116 timeout, timeobj = obj.timeout, obj
117 117 try:
118 118 events = cls.poll.poll(timeout)
119 119 except select.error, err:
120 120 if err.args[0] == errno.EINTR:
121 121 continue
122 122 raise
123 123 if events:
124 124 by_fd = {}
125 125 for fd, event in events:
126 126 by_fd.setdefault(fd, []).append(event)
127 127
128 128 for fd, events in by_fd.iteritems():
129 129 cls.instances[fd].handle_pollevents(events)
130 130
131 131 elif timeobj:
132 132 timeobj.handle_timeout()
133 133
134 134 def eventaction(code):
135 135 """
136 136 Decorator to help handle events in repowatcher
137 137 """
138 138 def decorator(f):
139 139 def wrapper(self, wpath):
140 140 if code == 'm' and wpath in self.lastevent and \
141 141 self.lastevent[wpath] in 'cm':
142 142 return
143 143 self.lastevent[wpath] = code
144 144 self.timeout = 250
145 145
146 146 f(self, wpath)
147 147
148 148 wrapper.func_name = f.func_name
149 149 return wrapper
150 150 return decorator
151 151
152 152 class repowatcher(server.repowatcher, pollable):
153 153 """
154 154 Watches inotify events
155 155 """
156 156 mask = (
157 157 inotify.IN_ATTRIB |
158 158 inotify.IN_CREATE |
159 159 inotify.IN_DELETE |
160 160 inotify.IN_DELETE_SELF |
161 161 inotify.IN_MODIFY |
162 162 inotify.IN_MOVED_FROM |
163 163 inotify.IN_MOVED_TO |
164 164 inotify.IN_MOVE_SELF |
165 165 inotify.IN_ONLYDIR |
166 166 inotify.IN_UNMOUNT |
167 167 0)
168 168
169 169 def __init__(self, ui, dirstate, root):
170 170 server.repowatcher.__init__(self, ui, dirstate, root)
171 171
172 172 self.lastevent = {}
173 173 self.dirty = False
174 174 try:
175 175 self.watcher = watcher.watcher()
176 176 except OSError, err:
177 177 raise util.Abort(_('inotify service not available: %s') %
178 178 err.strerror)
179 179 self.threshold = watcher.threshold(self.watcher)
180 180 self.fileno = self.watcher.fileno
181 181 self.register(timeout=None)
182 182
183 183 self.handle_timeout()
184 184 self.scan()
185 185
186 186 def event_time(self):
187 187 last = self.last_event
188 188 now = time.time()
189 189 self.last_event = now
190 190
191 191 if last is None:
192 192 return 'start'
193 193 delta = now - last
194 194 if delta < 5:
195 195 return '+%.3f' % delta
196 196 if delta < 50:
197 197 return '+%.2f' % delta
198 198 return '+%.1f' % delta
199 199
200 200 def add_watch(self, path, mask):
201 201 if not path:
202 202 return
203 203 if self.watcher.path(path) is None:
204 204 if self.ui.debugflag:
205 205 self.ui.note(_('watching %r\n') % path[self.prefixlen:])
206 206 try:
207 207 self.watcher.add(path, mask)
208 208 except OSError, err:
209 209 if err.errno in (errno.ENOENT, errno.ENOTDIR):
210 210 return
211 211 if err.errno != errno.ENOSPC:
212 212 raise
213 213 _explain_watch_limit(self.ui, self.dirstate, self.wprefix)
214 214
215 215 def setup(self):
216 216 self.ui.note(_('watching directories under %r\n') % self.wprefix)
217 217 self.add_watch(self.wprefix + '.hg', inotify.IN_DELETE)
218 218
219 219 def scan(self, topdir=''):
220 220 ds = self.dirstate._map.copy()
221 221 self.add_watch(server.join(self.wprefix, topdir), self.mask)
222 222 for root, dirs, files in server.walk(self.dirstate, self.wprefix,
223 223 topdir):
224 224 for d in dirs:
225 225 self.add_watch(server.join(root, d), self.mask)
226 226 wroot = root[self.prefixlen:]
227 227 for fn in files:
228 228 wfn = server.join(wroot, fn)
229 229 self.updatefile(wfn, self.getstat(wfn))
230 230 ds.pop(wfn, None)
231 231 wtopdir = topdir
232 232 if wtopdir and wtopdir[-1] != '/':
233 233 wtopdir += '/'
234 234 for wfn, state in ds.iteritems():
235 235 if not wfn.startswith(wtopdir):
236 236 continue
237 237 try:
238 238 st = self.stat(wfn)
239 239 except OSError:
240 240 status = state[0]
241 241 self.deletefile(wfn, status)
242 242 else:
243 243 self.updatefile(wfn, st)
244 244 self.check_deleted('!')
245 245 self.check_deleted('r')
246 246
247 247 @eventaction('c')
248 248 def created(self, wpath):
249 249 if wpath == '.hgignore':
250 250 self.update_hgignore()
251 251 try:
252 252 st = self.stat(wpath)
253 253 if stat.S_ISREG(st[0]) or stat.S_ISLNK(st[0]):
254 254 self.updatefile(wpath, st)
255 255 except OSError:
256 256 pass
257 257
258 258 @eventaction('m')
259 259 def modified(self, wpath):
260 260 if wpath == '.hgignore':
261 261 self.update_hgignore()
262 262 try:
263 263 st = self.stat(wpath)
264 264 if stat.S_ISREG(st[0]):
265 265 if self.dirstate[wpath] in 'lmn':
266 266 self.updatefile(wpath, st)
267 267 except OSError:
268 268 pass
269 269
270 270 @eventaction('d')
271 271 def deleted(self, wpath):
272 272 if wpath == '.hgignore':
273 273 self.update_hgignore()
274 274 elif wpath.startswith('.hg/'):
275 275 return
276 276
277 277 self.deletefile(wpath, self.dirstate[wpath])
278 278
279 279 def process_create(self, wpath, evt):
280 280 if self.ui.debugflag:
281 281 self.ui.note(_('%s event: created %s\n') %
282 282 (self.event_time(), wpath))
283 283
284 284 if evt.mask & inotify.IN_ISDIR:
285 285 self.scan(wpath)
286 286 else:
287 287 self.created(wpath)
288 288
289 289 def process_delete(self, wpath, evt):
290 290 if self.ui.debugflag:
291 291 self.ui.note(_('%s event: deleted %s\n') %
292 292 (self.event_time(), wpath))
293 293
294 294 if evt.mask & inotify.IN_ISDIR:
295 295 tree = self.tree.dir(wpath)
296 296 todelete = [wfn for wfn, ignore in tree.walk('?')]
297 297 for fn in todelete:
298 298 self.deletefile(fn, '?')
299 299 self.scan(wpath)
300 300 else:
301 301 self.deleted(wpath)
302 302
303 303 def process_modify(self, wpath, evt):
304 304 if self.ui.debugflag:
305 305 self.ui.note(_('%s event: modified %s\n') %
306 306 (self.event_time(), wpath))
307 307
308 308 if not (evt.mask & inotify.IN_ISDIR):
309 309 self.modified(wpath)
310 310
311 311 def process_unmount(self, evt):
312 312 self.ui.warn(_('filesystem containing %s was unmounted\n') %
313 313 evt.fullpath)
314 314 sys.exit(0)
315 315
316 316 def handle_pollevents(self, events):
317 317 if self.ui.debugflag:
318 318 self.ui.note(_('%s readable: %d bytes\n') %
319 319 (self.event_time(), self.threshold.readable()))
320 320 if not self.threshold():
321 321 if self.registered:
322 322 if self.ui.debugflag:
323 323 self.ui.note(_('%s below threshold - unhooking\n') %
324 324 (self.event_time()))
325 325 self.unregister()
326 326 self.timeout = 250
327 327 else:
328 328 self.read_events()
329 329
330 330 def read_events(self, bufsize=None):
331 331 events = self.watcher.read(bufsize)
332 332 if self.ui.debugflag:
333 333 self.ui.note(_('%s reading %d events\n') %
334 334 (self.event_time(), len(events)))
335 335 for evt in events:
336 336 if evt.fullpath == self.wprefix[:-1]:
337 337 # events on the root of the repository
338 338 # itself, e.g. permission changes or repository move
339 339 continue
340 340 assert evt.fullpath.startswith(self.wprefix)
341 341 wpath = evt.fullpath[self.prefixlen:]
342 342
343 343 # paths have been normalized, wpath never ends with a '/'
344 344
345 345 if wpath.startswith('.hg/') and evt.mask & inotify.IN_ISDIR:
346 346 # ignore subdirectories of .hg/ (merge, patches...)
347 347 continue
348 348 if wpath == ".hg/wlock":
349 349 if evt.mask & inotify.IN_DELETE:
350 350 self.dirstate.invalidate()
351 351 self.dirty = False
352 352 self.scan()
353 353 elif evt.mask & inotify.IN_CREATE:
354 354 self.dirty = True
355 355 else:
356 356 if self.dirty:
357 357 continue
358 358
359 359 if evt.mask & inotify.IN_UNMOUNT:
360 360 self.process_unmount(wpath, evt)
361 361 elif evt.mask & (inotify.IN_MODIFY | inotify.IN_ATTRIB):
362 362 self.process_modify(wpath, evt)
363 363 elif evt.mask & (inotify.IN_DELETE | inotify.IN_DELETE_SELF |
364 364 inotify.IN_MOVED_FROM):
365 365 self.process_delete(wpath, evt)
366 366 elif evt.mask & (inotify.IN_CREATE | inotify.IN_MOVED_TO):
367 367 self.process_create(wpath, evt)
368 368
369 369 self.lastevent.clear()
370 370
371 371 def handle_timeout(self):
372 372 if not self.registered:
373 373 if self.ui.debugflag:
374 374 self.ui.note(_('%s hooking back up with %d bytes readable\n') %
375 375 (self.event_time(), self.threshold.readable()))
376 376 self.read_events(0)
377 377 self.register(timeout=None)
378 378
379 379 self.timeout = None
380 380
381 381 def shutdown(self):
382 382 self.watcher.close()
383 383
384 384 def debug(self):
385 385 """
386 386 Returns a sorted list of relatives paths currently watched,
387 387 for debugging purposes.
388 388 """
389 389 return sorted(tuple[0][self.prefixlen:] for tuple in self.watcher)
390 390
391 391 class socketlistener(server.socketlistener, pollable):
392 392 """
393 393 Listens for client queries on unix socket inotify.sock
394 394 """
395 395 def __init__(self, ui, root, repowatcher, timeout):
396 396 server.socketlistener.__init__(self, ui, root, repowatcher, timeout)
397 397 self.register(timeout=timeout)
398 398
399 399 def handle_timeout(self):
400 400 raise server.TimeoutException
401 401
402 402 def handle_pollevents(self, events):
403 403 for e in events:
404 404 self.accept_connection()
405 405
406 406 def shutdown(self):
407 407 self.sock.close()
408 408 try:
409 409 os.unlink(self.sockpath)
410 410 if self.realsockpath:
411 411 os.unlink(self.realsockpath)
412 412 os.rmdir(os.path.dirname(self.realsockpath))
413 413 except OSError, err:
414 414 if err.errno != errno.ENOENT:
415 415 raise
416 416
417 417 def answer_stat_query(self, cs):
418 418 if self.repowatcher.timeout:
419 419 # We got a query while a rescan is pending. Make sure we
420 420 # rescan before responding, or we could give back a wrong
421 421 # answer.
422 422 self.repowatcher.handle_timeout()
423 423 return server.socketlistener.answer_stat_query(self, cs)
424 424
425 425 class master(object):
426 426 def __init__(self, ui, dirstate, root, timeout=None):
427 427 self.ui = ui
428 428 self.repowatcher = repowatcher(ui, dirstate, root)
429 429 self.socketlistener = socketlistener(ui, root, self.repowatcher,
430 430 timeout)
431 431
432 432 def shutdown(self):
433 433 for obj in pollable.instances.itervalues():
434 obj.shutdown()
434 try:
435 obj.shutdown()
436 except error.SignalInterrupt:
437 pass
435 438
436 439 def run(self):
437 440 self.repowatcher.setup()
438 441 self.ui.note(_('finished setup\n'))
439 442 if os.getenv('TIME_STARTUP'):
440 443 sys.exit(0)
441 444 pollable.run()
General Comments 0
You need to be logged in to leave comments. Login now