##// END OF EJS Templates
inotify: create a common, OS-independent server entry point...
Nicolas Dumazet -
r9933:2e790215 default
parent child Browse files
Show More
@@ -1,869 +1,429 b''
1 # server.py - inotify status server
1 # linuxserver.py - inotify status server for linux
2 2 #
3 3 # Copyright 2006, 2007, 2008 Bryan O'Sullivan <bos@serpentine.com>
4 4 # Copyright 2007, 2008 Brendan Cully <brendan@kublai.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2, incorporated herein by reference.
8 8
9 9 from mercurial.i18n import _
10 from mercurial import cmdutil, osutil, util
10 from mercurial import osutil, util
11 11 import common
12 import errno, os, select, socket, stat, struct, sys, tempfile, time
12 import server
13 import errno, os, select, stat, sys, time
13 14
14 15 try:
15 16 import linux as inotify
16 17 from linux import watcher
17 18 except ImportError:
18 19 raise
19 20
20 class AlreadyStartedException(Exception): pass
21
22 def join(a, b):
23 if a:
24 if a[-1] == '/':
25 return a + b
26 return a + '/' + b
27 return b
28
29 def split(path):
30 c = path.rfind('/')
31 if c == -1:
32 return '', path
33 return path[:c], path[c+1:]
34
35 walk_ignored_errors = (errno.ENOENT, errno.ENAMETOOLONG)
36
37 21 def walkrepodirs(dirstate, absroot):
38 22 '''Iterate over all subdirectories of this repo.
39 23 Exclude the .hg directory, any nested repos, and ignored dirs.'''
40 24 def walkit(dirname, top):
41 fullpath = join(absroot, dirname)
25 fullpath = server.join(absroot, dirname)
42 26 try:
43 27 for name, kind in osutil.listdir(fullpath):
44 28 if kind == stat.S_IFDIR:
45 29 if name == '.hg':
46 30 if not top:
47 31 return
48 32 else:
49 d = join(dirname, name)
33 d = server.join(dirname, name)
50 34 if dirstate._ignore(d):
51 35 continue
52 36 for subdir in walkit(d, False):
53 37 yield subdir
54 38 except OSError, err:
55 if err.errno not in walk_ignored_errors:
39 if err.errno not in server.walk_ignored_errors:
56 40 raise
57 41 yield fullpath
58 42
59 43 return walkit('', True)
60 44
61 def walk(dirstate, absroot, root):
62 '''Like os.walk, but only yields regular files.'''
63
64 # This function is critical to performance during startup.
65
66 def walkit(root, reporoot):
67 files, dirs = [], []
68
69 try:
70 fullpath = join(absroot, root)
71 for name, kind in osutil.listdir(fullpath):
72 if kind == stat.S_IFDIR:
73 if name == '.hg':
74 if not reporoot:
75 return
76 else:
77 dirs.append(name)
78 path = join(root, name)
79 if dirstate._ignore(path):
80 continue
81 for result in walkit(path, False):
82 yield result
83 elif kind in (stat.S_IFREG, stat.S_IFLNK):
84 files.append(name)
85 yield fullpath, dirs, files
86
87 except OSError, err:
88 if err.errno == errno.ENOTDIR:
89 # fullpath was a directory, but has since been replaced
90 # by a file.
91 yield fullpath, dirs, files
92 elif err.errno not in walk_ignored_errors:
93 raise
94
95 return walkit(root, root == '')
96
97 45 def _explain_watch_limit(ui, dirstate, rootabs):
98 46 path = '/proc/sys/fs/inotify/max_user_watches'
99 47 try:
100 48 limit = int(file(path).read())
101 49 except IOError, err:
102 50 if err.errno != errno.ENOENT:
103 51 raise
104 52 raise util.Abort(_('this system does not seem to '
105 53 'support inotify'))
106 54 ui.warn(_('*** the current per-user limit on the number '
107 55 'of inotify watches is %s\n') % limit)
108 56 ui.warn(_('*** this limit is too low to watch every '
109 57 'directory in this repository\n'))
110 58 ui.warn(_('*** counting directories: '))
111 59 ndirs = len(list(walkrepodirs(dirstate, rootabs)))
112 60 ui.warn(_('found %d\n') % ndirs)
113 61 newlimit = min(limit, 1024)
114 62 while newlimit < ((limit + ndirs) * 1.1):
115 63 newlimit *= 2
116 64 ui.warn(_('*** to raise the limit from %d to %d (run as root):\n') %
117 65 (limit, newlimit))
118 66 ui.warn(_('*** echo %d > %s\n') % (newlimit, path))
119 67 raise util.Abort(_('cannot watch %s until inotify watch limit is raised')
120 68 % rootabs)
121 69
122 70 class pollable(object):
123 71 """
124 72 Interface to support polling.
125 73 The file descriptor returned by fileno() is registered to a polling
126 74 object.
127 75 Usage:
128 76 Every tick, check if an event has happened since the last tick:
129 77 * If yes, call handle_events
130 78 * If no, call handle_timeout
131 79 """
132 80 poll_events = select.POLLIN
133 81 instances = {}
134 82 poll = select.poll()
135 83
136 84 def fileno(self):
137 85 raise NotImplementedError
138 86
139 87 def handle_events(self, events):
140 88 raise NotImplementedError
141 89
142 90 def handle_timeout(self):
143 91 raise NotImplementedError
144 92
145 93 def shutdown(self):
146 94 raise NotImplementedError
147 95
148 96 def register(self, timeout):
149 97 fd = self.fileno()
150 98
151 99 pollable.poll.register(fd, pollable.poll_events)
152 100 pollable.instances[fd] = self
153 101
154 102 self.registered = True
155 103 self.timeout = timeout
156 104
157 105 def unregister(self):
158 106 pollable.poll.unregister(self)
159 107 self.registered = False
160 108
161 109 @classmethod
162 110 def run(cls):
163 111 while True:
164 112 timeout = None
165 113 timeobj = None
166 114 for obj in cls.instances.itervalues():
167 115 if obj.timeout is not None and (timeout is None or obj.timeout < timeout):
168 116 timeout, timeobj = obj.timeout, obj
169 117 try:
170 118 events = cls.poll.poll(timeout)
171 119 except select.error, err:
172 120 if err[0] == errno.EINTR:
173 121 continue
174 122 raise
175 123 if events:
176 124 by_fd = {}
177 125 for fd, event in events:
178 126 by_fd.setdefault(fd, []).append(event)
179 127
180 128 for fd, events in by_fd.iteritems():
181 129 cls.instances[fd].handle_pollevents(events)
182 130
183 131 elif timeobj:
184 132 timeobj.handle_timeout()
185 133
186 134 def eventaction(code):
187 135 """
188 136 Decorator to help handle events in repowatcher
189 137 """
190 138 def decorator(f):
191 139 def wrapper(self, wpath):
192 140 if code == 'm' and wpath in self.lastevent and \
193 141 self.lastevent[wpath] in 'cm':
194 142 return
195 143 self.lastevent[wpath] = code
196 144 self.timeout = 250
197 145
198 146 f(self, wpath)
199 147
200 148 wrapper.func_name = f.func_name
201 149 return wrapper
202 150 return decorator
203 151
204 class directory(object):
205 """
206 Representing a directory
207
208 * path is the relative path from repo root to this directory
209 * files is a dict listing the files in this directory
210 - keys are file names
211 - values are file status
212 * dirs is a dict listing the subdirectories
213 - key are subdirectories names
214 - values are directory objects
215 """
216 def __init__(self, relpath=''):
217 self.path = relpath
218 self.files = {}
219 self.dirs = {}
220
221 def dir(self, relpath):
222 """
223 Returns the directory contained at the relative path relpath.
224 Creates the intermediate directories if necessary.
225 """
226 if not relpath:
227 return self
228 l = relpath.split('/')
229 ret = self
230 while l:
231 next = l.pop(0)
232 try:
233 ret = ret.dirs[next]
234 except KeyError:
235 d = directory(join(ret.path, next))
236 ret.dirs[next] = d
237 ret = d
238 return ret
239
240 def walk(self, states, visited=None):
241 """
242 yield (filename, status) pairs for items in the trees
243 that have status in states.
244 filenames are relative to the repo root
245 """
246 for file, st in self.files.iteritems():
247 if st in states:
248 yield join(self.path, file), st
249 for dir in self.dirs.itervalues():
250 if visited is not None:
251 visited.add(dir.path)
252 for e in dir.walk(states):
253 yield e
254
255 def lookup(self, states, path, visited):
256 """
257 yield root-relative filenames that match path, and whose
258 status are in states:
259 * if path is a file, yield path
260 * if path is a directory, yield directory files
261 * if path is not tracked, yield nothing
262 """
263 if path[-1] == '/':
264 path = path[:-1]
265
266 paths = path.split('/')
267
268 # we need to check separately for last node
269 last = paths.pop()
270
271 tree = self
272 try:
273 for dir in paths:
274 tree = tree.dirs[dir]
275 except KeyError:
276 # path is not tracked
277 visited.add(tree.path)
278 return
279
280 try:
281 # if path is a directory, walk it
282 target = tree.dirs[last]
283 visited.add(target.path)
284 for file, st in target.walk(states, visited):
285 yield file
286 except KeyError:
287 try:
288 if tree.files[last] in states:
289 # path is a file
290 visited.add(tree.path)
291 yield path
292 except KeyError:
293 # path is not tracked
294 pass
295
296 class repowatcher(pollable):
152 class repowatcher(server.repowatcher, pollable):
297 153 """
298 154 Watches inotify events
299 155 """
300 statuskeys = 'almr!?'
301 156 mask = (
302 157 inotify.IN_ATTRIB |
303 158 inotify.IN_CREATE |
304 159 inotify.IN_DELETE |
305 160 inotify.IN_DELETE_SELF |
306 161 inotify.IN_MODIFY |
307 162 inotify.IN_MOVED_FROM |
308 163 inotify.IN_MOVED_TO |
309 164 inotify.IN_MOVE_SELF |
310 165 inotify.IN_ONLYDIR |
311 166 inotify.IN_UNMOUNT |
312 167 0)
313 168
314 169 def __init__(self, ui, dirstate, root):
315 self.ui = ui
316 self.dirstate = dirstate
170 server.repowatcher.__init__(self, ui, dirstate, root)
317 171
318 self.wprefix = join(root, '')
319 self.prefixlen = len(self.wprefix)
172 self.lastevent = {}
320 173 try:
321 174 self.watcher = watcher.watcher()
322 175 except OSError, err:
323 176 raise util.Abort(_('inotify service not available: %s') %
324 177 err.strerror)
325 178 self.threshold = watcher.threshold(self.watcher)
326 179 self.fileno = self.watcher.fileno
327
328 self.tree = directory()
329 self.statcache = {}
330 self.statustrees = dict([(s, directory()) for s in self.statuskeys])
331
332 self.last_event = None
333
334 self.lastevent = {}
335
336 180 self.register(timeout=None)
337 181
338 self.ds_info = self.dirstate_info()
339 182 self.handle_timeout()
340 183 self.scan()
341 184
342 185 def event_time(self):
343 186 last = self.last_event
344 187 now = time.time()
345 188 self.last_event = now
346 189
347 190 if last is None:
348 191 return 'start'
349 192 delta = now - last
350 193 if delta < 5:
351 194 return '+%.3f' % delta
352 195 if delta < 50:
353 196 return '+%.2f' % delta
354 197 return '+%.1f' % delta
355 198
356 def dirstate_info(self):
357 try:
358 st = os.lstat(self.wprefix + '.hg/dirstate')
359 return st.st_mtime, st.st_ino
360 except OSError, err:
361 if err.errno != errno.ENOENT:
362 raise
363 return 0, 0
364
365 199 def add_watch(self, path, mask):
366 200 if not path:
367 201 return
368 202 if self.watcher.path(path) is None:
369 203 if self.ui.debugflag:
370 204 self.ui.note(_('watching %r\n') % path[self.prefixlen:])
371 205 try:
372 206 self.watcher.add(path, mask)
373 207 except OSError, err:
374 208 if err.errno in (errno.ENOENT, errno.ENOTDIR):
375 209 return
376 210 if err.errno != errno.ENOSPC:
377 211 raise
378 212 _explain_watch_limit(self.ui, self.dirstate, self.wprefix)
379 213
380 214 def setup(self):
381 215 self.ui.note(_('watching directories under %r\n') % self.wprefix)
382 216 self.add_watch(self.wprefix + '.hg', inotify.IN_DELETE)
383 217 self.check_dirstate()
384 218
385 def filestatus(self, fn, st):
386 try:
387 type_, mode, size, time = self.dirstate._map[fn][:4]
388 except KeyError:
389 type_ = '?'
390 if type_ == 'n':
391 st_mode, st_size, st_mtime = st
392 if size == -1:
393 return 'l'
394 if size and (size != st_size or (mode ^ st_mode) & 0100):
395 return 'm'
396 if time != int(st_mtime):
397 return 'l'
398 return 'n'
399 if type_ == '?' and self.dirstate._ignore(fn):
400 return 'i'
401 return type_
402
403 def updatefile(self, wfn, osstat):
404 '''
405 update the file entry of an existing file.
406
407 osstat: (mode, size, time) tuple, as returned by os.lstat(wfn)
408 '''
409
410 self._updatestatus(wfn, self.filestatus(wfn, osstat))
411
412 def deletefile(self, wfn, oldstatus):
413 '''
414 update the entry of a file which has been deleted.
415
416 oldstatus: char in statuskeys, status of the file before deletion
417 '''
418 if oldstatus == 'r':
419 newstatus = 'r'
420 elif oldstatus in 'almn':
421 newstatus = '!'
422 else:
423 newstatus = None
424
425 self.statcache.pop(wfn, None)
426 self._updatestatus(wfn, newstatus)
427
428 def _updatestatus(self, wfn, newstatus):
429 '''
430 Update the stored status of a file.
431
432 newstatus: - char in (statuskeys + 'ni'), new status to apply.
433 - or None, to stop tracking wfn
434 '''
435 root, fn = split(wfn)
436 d = self.tree.dir(root)
437
438 oldstatus = d.files.get(fn)
439 # oldstatus can be either:
440 # - None : fn is new
441 # - a char in statuskeys: fn is a (tracked) file
442
443 if self.ui.debugflag and oldstatus != newstatus:
444 self.ui.note(_('status: %r %s -> %s\n') %
445 (wfn, oldstatus, newstatus))
446
447 if oldstatus and oldstatus in self.statuskeys \
448 and oldstatus != newstatus:
449 del self.statustrees[oldstatus].dir(root).files[fn]
450
451 if newstatus in (None, 'i'):
452 d.files.pop(fn, None)
453 elif oldstatus != newstatus:
454 d.files[fn] = newstatus
455 if newstatus != 'n':
456 self.statustrees[newstatus].dir(root).files[fn] = newstatus
457
458
459 def check_deleted(self, key):
460 # Files that had been deleted but were present in the dirstate
461 # may have vanished from the dirstate; we must clean them up.
462 nuke = []
463 for wfn, ignore in self.statustrees[key].walk(key):
464 if wfn not in self.dirstate:
465 nuke.append(wfn)
466 for wfn in nuke:
467 root, fn = split(wfn)
468 del self.statustrees[key].dir(root).files[fn]
469 del self.tree.dir(root).files[fn]
470
471 219 def scan(self, topdir=''):
472 220 ds = self.dirstate._map.copy()
473 self.add_watch(join(self.wprefix, topdir), self.mask)
474 for root, dirs, files in walk(self.dirstate, self.wprefix, topdir):
221 self.add_watch(server.join(self.wprefix, topdir), self.mask)
222 for root, dirs, files in server.walk(self.dirstate, self.wprefix,
223 topdir):
475 224 for d in dirs:
476 self.add_watch(join(root, d), self.mask)
225 self.add_watch(server.join(root, d), self.mask)
477 226 wroot = root[self.prefixlen:]
478 227 for fn in files:
479 wfn = join(wroot, fn)
228 wfn = server.join(wroot, fn)
480 229 self.updatefile(wfn, self.getstat(wfn))
481 230 ds.pop(wfn, None)
482 231 wtopdir = topdir
483 232 if wtopdir and wtopdir[-1] != '/':
484 233 wtopdir += '/'
485 234 for wfn, state in ds.iteritems():
486 235 if not wfn.startswith(wtopdir):
487 236 continue
488 237 try:
489 238 st = self.stat(wfn)
490 239 except OSError:
491 240 status = state[0]
492 241 self.deletefile(wfn, status)
493 242 else:
494 243 self.updatefile(wfn, st)
495 244 self.check_deleted('!')
496 245 self.check_deleted('r')
497 246
498 def check_dirstate(self):
499 ds_info = self.dirstate_info()
500 if ds_info == self.ds_info:
501 return
502 self.ds_info = ds_info
503 if not self.ui.debugflag:
504 self.last_event = None
505 self.ui.note(_('%s dirstate reload\n') % self.event_time())
506 self.dirstate.invalidate()
507 self.handle_timeout()
508 self.scan()
509 self.ui.note(_('%s end dirstate reload\n') % self.event_time())
510
511 def update_hgignore(self):
512 # An update of the ignore file can potentially change the
513 # states of all unknown and ignored files.
514
515 # XXX If the user has other ignore files outside the repo, or
516 # changes their list of ignore files at run time, we'll
517 # potentially never see changes to them. We could get the
518 # client to report to us what ignore data they're using.
519 # But it's easier to do nothing than to open that can of
520 # worms.
521
522 if '_ignore' in self.dirstate.__dict__:
523 delattr(self.dirstate, '_ignore')
524 self.ui.note(_('rescanning due to .hgignore change\n'))
525 self.handle_timeout()
526 self.scan()
527
528 def getstat(self, wpath):
529 try:
530 return self.statcache[wpath]
531 except KeyError:
532 try:
533 return self.stat(wpath)
534 except OSError, err:
535 if err.errno != errno.ENOENT:
536 raise
537
538 def stat(self, wpath):
539 try:
540 st = os.lstat(join(self.wprefix, wpath))
541 ret = st.st_mode, st.st_size, st.st_mtime
542 self.statcache[wpath] = ret
543 return ret
544 except OSError:
545 self.statcache.pop(wpath, None)
546 raise
547
548 247 @eventaction('c')
549 248 def created(self, wpath):
550 249 if wpath == '.hgignore':
551 250 self.update_hgignore()
552 251 try:
553 252 st = self.stat(wpath)
554 253 if stat.S_ISREG(st[0]):
555 254 self.updatefile(wpath, st)
556 255 except OSError:
557 256 pass
558 257
559 258 @eventaction('m')
560 259 def modified(self, wpath):
561 260 if wpath == '.hgignore':
562 261 self.update_hgignore()
563 262 try:
564 263 st = self.stat(wpath)
565 264 if stat.S_ISREG(st[0]):
566 265 if self.dirstate[wpath] in 'lmn':
567 266 self.updatefile(wpath, st)
568 267 except OSError:
569 268 pass
570 269
571 270 @eventaction('d')
572 271 def deleted(self, wpath):
573 272 if wpath == '.hgignore':
574 273 self.update_hgignore()
575 274 elif wpath.startswith('.hg/'):
576 275 if wpath == '.hg/wlock':
577 276 self.check_dirstate()
578 277 return
579 278
580 279 self.deletefile(wpath, self.dirstate[wpath])
581 280
582 281 def process_create(self, wpath, evt):
583 282 if self.ui.debugflag:
584 283 self.ui.note(_('%s event: created %s\n') %
585 284 (self.event_time(), wpath))
586 285
587 286 if evt.mask & inotify.IN_ISDIR:
588 287 self.scan(wpath)
589 288 else:
590 289 self.created(wpath)
591 290
592 291 def process_delete(self, wpath, evt):
593 292 if self.ui.debugflag:
594 293 self.ui.note(_('%s event: deleted %s\n') %
595 294 (self.event_time(), wpath))
596 295
597 296 if evt.mask & inotify.IN_ISDIR:
598 297 tree = self.tree.dir(wpath)
599 298 todelete = [wfn for wfn, ignore in tree.walk('?')]
600 299 for fn in todelete:
601 300 self.deletefile(fn, '?')
602 301 self.scan(wpath)
603 302 else:
604 303 self.deleted(wpath)
605 304
606 305 def process_modify(self, wpath, evt):
607 306 if self.ui.debugflag:
608 307 self.ui.note(_('%s event: modified %s\n') %
609 308 (self.event_time(), wpath))
610 309
611 310 if not (evt.mask & inotify.IN_ISDIR):
612 311 self.modified(wpath)
613 312
614 313 def process_unmount(self, evt):
615 314 self.ui.warn(_('filesystem containing %s was unmounted\n') %
616 315 evt.fullpath)
617 316 sys.exit(0)
618 317
619 318 def handle_pollevents(self, events):
620 319 if self.ui.debugflag:
621 320 self.ui.note(_('%s readable: %d bytes\n') %
622 321 (self.event_time(), self.threshold.readable()))
623 322 if not self.threshold():
624 323 if self.registered:
625 324 if self.ui.debugflag:
626 325 self.ui.note(_('%s below threshold - unhooking\n') %
627 326 (self.event_time()))
628 327 self.unregister()
629 328 self.timeout = 250
630 329 else:
631 330 self.read_events()
632 331
633 332 def read_events(self, bufsize=None):
634 333 events = self.watcher.read(bufsize)
635 334 if self.ui.debugflag:
636 335 self.ui.note(_('%s reading %d events\n') %
637 336 (self.event_time(), len(events)))
638 337 for evt in events:
639 338 assert evt.fullpath.startswith(self.wprefix)
640 339 wpath = evt.fullpath[self.prefixlen:]
641 340
642 341 # paths have been normalized, wpath never ends with a '/'
643 342
644 343 if wpath.startswith('.hg/') and evt.mask & inotify.IN_ISDIR:
645 344 # ignore subdirectories of .hg/ (merge, patches...)
646 345 continue
647 346
648 347 if evt.mask & inotify.IN_UNMOUNT:
649 348 self.process_unmount(wpath, evt)
650 349 elif evt.mask & (inotify.IN_MODIFY | inotify.IN_ATTRIB):
651 350 self.process_modify(wpath, evt)
652 351 elif evt.mask & (inotify.IN_DELETE | inotify.IN_DELETE_SELF |
653 352 inotify.IN_MOVED_FROM):
654 353 self.process_delete(wpath, evt)
655 354 elif evt.mask & (inotify.IN_CREATE | inotify.IN_MOVED_TO):
656 355 self.process_create(wpath, evt)
657 356
658 357 self.lastevent.clear()
659 358
660 359 def handle_timeout(self):
661 360 if not self.registered:
662 361 if self.ui.debugflag:
663 362 self.ui.note(_('%s hooking back up with %d bytes readable\n') %
664 363 (self.event_time(), self.threshold.readable()))
665 364 self.read_events(0)
666 365 self.register(timeout=None)
667 366
668 367 self.timeout = None
669 368
670 369 def shutdown(self):
671 370 self.watcher.close()
672 371
673 372 def debug(self):
674 373 """
675 374 Returns a sorted list of relatives paths currently watched,
676 375 for debugging purposes.
677 376 """
678 377 return sorted(tuple[0][self.prefixlen:] for tuple in self.watcher)
679 378
680 class server(pollable):
379 class socketlistener(server.socketlistener, pollable):
681 380 """
682 381 Listens for client queries on unix socket inotify.sock
683 382 """
684 383 def __init__(self, ui, root, repowatcher, timeout):
685 self.ui = ui
686 self.repowatcher = repowatcher
687 self.sock = socket.socket(socket.AF_UNIX)
688 self.sockpath = join(root, '.hg/inotify.sock')
689 self.realsockpath = None
690 try:
691 self.sock.bind(self.sockpath)
692 except socket.error, err:
693 if err[0] == errno.EADDRINUSE:
694 raise AlreadyStartedException( _('cannot start: socket is '
695 'already bound'))
696 if err[0] == "AF_UNIX path too long":
697 if os.path.islink(self.sockpath) and \
698 not os.path.exists(self.sockpath):
699 raise util.Abort('inotify-server: cannot start: '
700 '.hg/inotify.sock is a broken symlink')
701 tempdir = tempfile.mkdtemp(prefix="hg-inotify-")
702 self.realsockpath = os.path.join(tempdir, "inotify.sock")
703 try:
704 self.sock.bind(self.realsockpath)
705 os.symlink(self.realsockpath, self.sockpath)
706 except (OSError, socket.error), inst:
707 try:
708 os.unlink(self.realsockpath)
709 except:
710 pass
711 os.rmdir(tempdir)
712 if inst.errno == errno.EEXIST:
713 raise AlreadyStartedException(_('cannot start: tried '
714 'linking .hg/inotify.sock to a temporary socket but'
715 ' .hg/inotify.sock already exists'))
716 raise
717 else:
718 raise
719 self.sock.listen(5)
720 self.fileno = self.sock.fileno
384 server.socketlistener.__init__(self, ui, root, repowatcher, timeout)
721 385 self.register(timeout=timeout)
722 386
723 387 def handle_timeout(self):
724 388 pass
725 389
726 def answer_stat_query(self, cs):
727 names = cs.read().split('\0')
728
729 states = names.pop()
730
731 self.ui.note(_('answering query for %r\n') % states)
732
733 if self.repowatcher.timeout:
734 # We got a query while a rescan is pending. Make sure we
735 # rescan before responding, or we could give back a wrong
736 # answer.
737 self.repowatcher.handle_timeout()
738
739 visited = set()
740 if not names:
741 def genresult(states, tree):
742 for fn, state in tree.walk(states):
743 yield fn
744 else:
745 def genresult(states, tree):
746 for fn in names:
747 for f in tree.lookup(states, fn, visited):
748 yield f
749
750 return ['\0'.join(r) for r in [
751 genresult('l', self.repowatcher.statustrees['l']),
752 genresult('m', self.repowatcher.statustrees['m']),
753 genresult('a', self.repowatcher.statustrees['a']),
754 genresult('r', self.repowatcher.statustrees['r']),
755 genresult('!', self.repowatcher.statustrees['!']),
756 '?' in states
757 and genresult('?', self.repowatcher.statustrees['?'])
758 or [],
759 [],
760 'c' in states and genresult('n', self.repowatcher.tree) or [],
761 visited
762 ]]
763
764 def answer_dbug_query(self):
765 return ['\0'.join(self.repowatcher.debug())]
766
767 390 def handle_pollevents(self, events):
768 391 for e in events:
769 self.handle_pollevent()
770
771 def handle_pollevent(self):
772 sock, addr = self.sock.accept()
773
774 cs = common.recvcs(sock)
775 version = ord(cs.read(1))
776
777 if version != common.version:
778 self.ui.warn(_('received query from incompatible client '
779 'version %d\n') % version)
780 try:
781 # try to send back our version to the client
782 # this way, the client too is informed of the mismatch
783 sock.sendall(chr(common.version))
784 except:
785 pass
786 return
787
788 type = cs.read(4)
789
790 if type == 'STAT':
791 results = self.answer_stat_query(cs)
792 elif type == 'DBUG':
793 results = self.answer_dbug_query()
794 else:
795 self.ui.warn(_('unrecognized query type: %s\n') % type)
796 return
797
798 try:
799 try:
800 v = chr(common.version)
801
802 sock.sendall(v + type + struct.pack(common.resphdrfmts[type],
803 *map(len, results)))
804 sock.sendall(''.join(results))
805 finally:
806 sock.shutdown(socket.SHUT_WR)
807 except socket.error, err:
808 if err[0] != errno.EPIPE:
809 raise
392 self.accept_connection()
810 393
811 394 def shutdown(self):
812 395 self.sock.close()
813 396 try:
814 397 os.unlink(self.sockpath)
815 398 if self.realsockpath:
816 399 os.unlink(self.realsockpath)
817 400 os.rmdir(os.path.dirname(self.realsockpath))
818 401 except OSError, err:
819 402 if err.errno != errno.ENOENT:
820 403 raise
821 404
405 def answer_stat_query(self, cs):
406 if self.repowatcher.timeout:
407 # We got a query while a rescan is pending. Make sure we
408 # rescan before responding, or we could give back a wrong
409 # answer.
410 self.repowatcher.handle_timeout()
411 return server.socketlistener.answer_stat_query(self, cs)
412
822 413 class master(object):
823 414 def __init__(self, ui, dirstate, root, timeout=None):
824 415 self.ui = ui
825 416 self.repowatcher = repowatcher(ui, dirstate, root)
826 self.server = server(ui, root, self.repowatcher, timeout)
417 self.socketlistener = socketlistener(ui, root, self.repowatcher,
418 timeout)
827 419
828 420 def shutdown(self):
829 421 for obj in pollable.instances.itervalues():
830 422 obj.shutdown()
831 423
832 424 def run(self):
833 425 self.repowatcher.setup()
834 426 self.ui.note(_('finished setup\n'))
835 427 if os.getenv('TIME_STARTUP'):
836 428 sys.exit(0)
837 429 pollable.run()
838
839 def start(ui, dirstate, root, opts):
840 timeout = opts.get('timeout')
841 if timeout:
842 timeout = float(timeout) * 1e3
843
844 class service(object):
845 def init(self):
846 try:
847 self.master = master(ui, dirstate, root, timeout)
848 except AlreadyStartedException, inst:
849 raise util.Abort("inotify-server: %s" % inst)
850
851 def run(self):
852 try:
853 self.master.run()
854 finally:
855 self.master.shutdown()
856
857 if 'inserve' not in sys.argv:
858 runargs = [sys.argv[0], 'inserve', '-R', root]
859 else:
860 runargs = sys.argv[:]
861
862 pidfile = ui.config('inotify', 'pidfile')
863 if opts['daemon'] and pidfile is not None and 'pid-file' not in runargs:
864 runargs.append("--pid-file=%s" % pidfile)
865
866 service = service()
867 logfile = ui.config('inotify', 'log')
868 cmdutil.service(opts, initfn=service.init, runfn=service.run,
869 logfile=logfile, runargs=runargs)
@@ -1,869 +1,486 b''
1 # server.py - inotify status server
1 # server.py - common entry point for inotify status server
2 2 #
3 # Copyright 2006, 2007, 2008 Bryan O'Sullivan <bos@serpentine.com>
4 # Copyright 2007, 2008 Brendan Cully <brendan@kublai.com>
3 # Copyright 2009 Nicolas Dumazet <nicdumz@gmail.com>
5 4 #
6 5 # This software may be used and distributed according to the terms of the
7 6 # GNU General Public License version 2, incorporated herein by reference.
8 7
9 8 from mercurial.i18n import _
10 9 from mercurial import cmdutil, osutil, util
11 10 import common
12 import errno, os, select, socket, stat, struct, sys, tempfile, time
13 11
14 try:
15 import linux as inotify
16 from linux import watcher
17 except ImportError:
18 raise
12 import errno
13 import os
14 import socket
15 import stat
16 import struct
17 import sys
18 import tempfile
19 19
20 20 class AlreadyStartedException(Exception): pass
21 21
22 22 def join(a, b):
23 23 if a:
24 24 if a[-1] == '/':
25 25 return a + b
26 26 return a + '/' + b
27 27 return b
28 28
29 29 def split(path):
30 30 c = path.rfind('/')
31 31 if c == -1:
32 32 return '', path
33 33 return path[:c], path[c+1:]
34 34
35 35 walk_ignored_errors = (errno.ENOENT, errno.ENAMETOOLONG)
36 36
37 def walkrepodirs(dirstate, absroot):
38 '''Iterate over all subdirectories of this repo.
39 Exclude the .hg directory, any nested repos, and ignored dirs.'''
40 def walkit(dirname, top):
41 fullpath = join(absroot, dirname)
42 try:
43 for name, kind in osutil.listdir(fullpath):
44 if kind == stat.S_IFDIR:
45 if name == '.hg':
46 if not top:
47 return
48 else:
49 d = join(dirname, name)
50 if dirstate._ignore(d):
51 continue
52 for subdir in walkit(d, False):
53 yield subdir
54 except OSError, err:
55 if err.errno not in walk_ignored_errors:
56 raise
57 yield fullpath
58
59 return walkit('', True)
60
61 37 def walk(dirstate, absroot, root):
62 38 '''Like os.walk, but only yields regular files.'''
63 39
64 40 # This function is critical to performance during startup.
65 41
66 42 def walkit(root, reporoot):
67 43 files, dirs = [], []
68 44
69 45 try:
70 46 fullpath = join(absroot, root)
71 47 for name, kind in osutil.listdir(fullpath):
72 48 if kind == stat.S_IFDIR:
73 49 if name == '.hg':
74 50 if not reporoot:
75 51 return
76 52 else:
77 53 dirs.append(name)
78 54 path = join(root, name)
79 55 if dirstate._ignore(path):
80 56 continue
81 57 for result in walkit(path, False):
82 58 yield result
83 59 elif kind in (stat.S_IFREG, stat.S_IFLNK):
84 60 files.append(name)
85 61 yield fullpath, dirs, files
86 62
87 63 except OSError, err:
88 64 if err.errno == errno.ENOTDIR:
89 65 # fullpath was a directory, but has since been replaced
90 66 # by a file.
91 67 yield fullpath, dirs, files
92 68 elif err.errno not in walk_ignored_errors:
93 69 raise
94 70
95 71 return walkit(root, root == '')
96 72
97 def _explain_watch_limit(ui, dirstate, rootabs):
98 path = '/proc/sys/fs/inotify/max_user_watches'
99 try:
100 limit = int(file(path).read())
101 except IOError, err:
102 if err.errno != errno.ENOENT:
103 raise
104 raise util.Abort(_('this system does not seem to '
105 'support inotify'))
106 ui.warn(_('*** the current per-user limit on the number '
107 'of inotify watches is %s\n') % limit)
108 ui.warn(_('*** this limit is too low to watch every '
109 'directory in this repository\n'))
110 ui.warn(_('*** counting directories: '))
111 ndirs = len(list(walkrepodirs(dirstate, rootabs)))
112 ui.warn(_('found %d\n') % ndirs)
113 newlimit = min(limit, 1024)
114 while newlimit < ((limit + ndirs) * 1.1):
115 newlimit *= 2
116 ui.warn(_('*** to raise the limit from %d to %d (run as root):\n') %
117 (limit, newlimit))
118 ui.warn(_('*** echo %d > %s\n') % (newlimit, path))
119 raise util.Abort(_('cannot watch %s until inotify watch limit is raised')
120 % rootabs)
121
122 class pollable(object):
123 """
124 Interface to support polling.
125 The file descriptor returned by fileno() is registered to a polling
126 object.
127 Usage:
128 Every tick, check if an event has happened since the last tick:
129 * If yes, call handle_events
130 * If no, call handle_timeout
131 """
132 poll_events = select.POLLIN
133 instances = {}
134 poll = select.poll()
135
136 def fileno(self):
137 raise NotImplementedError
138
139 def handle_events(self, events):
140 raise NotImplementedError
141
142 def handle_timeout(self):
143 raise NotImplementedError
144
145 def shutdown(self):
146 raise NotImplementedError
147
148 def register(self, timeout):
149 fd = self.fileno()
150
151 pollable.poll.register(fd, pollable.poll_events)
152 pollable.instances[fd] = self
153
154 self.registered = True
155 self.timeout = timeout
156
157 def unregister(self):
158 pollable.poll.unregister(self)
159 self.registered = False
160
161 @classmethod
162 def run(cls):
163 while True:
164 timeout = None
165 timeobj = None
166 for obj in cls.instances.itervalues():
167 if obj.timeout is not None and (timeout is None or obj.timeout < timeout):
168 timeout, timeobj = obj.timeout, obj
169 try:
170 events = cls.poll.poll(timeout)
171 except select.error, err:
172 if err[0] == errno.EINTR:
173 continue
174 raise
175 if events:
176 by_fd = {}
177 for fd, event in events:
178 by_fd.setdefault(fd, []).append(event)
179
180 for fd, events in by_fd.iteritems():
181 cls.instances[fd].handle_pollevents(events)
182
183 elif timeobj:
184 timeobj.handle_timeout()
185
186 def eventaction(code):
187 """
188 Decorator to help handle events in repowatcher
189 """
190 def decorator(f):
191 def wrapper(self, wpath):
192 if code == 'm' and wpath in self.lastevent and \
193 self.lastevent[wpath] in 'cm':
194 return
195 self.lastevent[wpath] = code
196 self.timeout = 250
197
198 f(self, wpath)
199
200 wrapper.func_name = f.func_name
201 return wrapper
202 return decorator
203
204 73 class directory(object):
205 74 """
206 75 Representing a directory
207 76
208 77 * path is the relative path from repo root to this directory
209 78 * files is a dict listing the files in this directory
210 79 - keys are file names
211 80 - values are file status
212 81 * dirs is a dict listing the subdirectories
213 82 - key are subdirectories names
214 83 - values are directory objects
215 84 """
216 85 def __init__(self, relpath=''):
217 86 self.path = relpath
218 87 self.files = {}
219 88 self.dirs = {}
220 89
221 90 def dir(self, relpath):
222 91 """
223 92 Returns the directory contained at the relative path relpath.
224 93 Creates the intermediate directories if necessary.
225 94 """
226 95 if not relpath:
227 96 return self
228 97 l = relpath.split('/')
229 98 ret = self
230 99 while l:
231 100 next = l.pop(0)
232 101 try:
233 102 ret = ret.dirs[next]
234 103 except KeyError:
235 104 d = directory(join(ret.path, next))
236 105 ret.dirs[next] = d
237 106 ret = d
238 107 return ret
239 108
240 109 def walk(self, states, visited=None):
241 110 """
242 111 yield (filename, status) pairs for items in the trees
243 112 that have status in states.
244 113 filenames are relative to the repo root
245 114 """
246 115 for file, st in self.files.iteritems():
247 116 if st in states:
248 117 yield join(self.path, file), st
249 118 for dir in self.dirs.itervalues():
250 119 if visited is not None:
251 120 visited.add(dir.path)
252 121 for e in dir.walk(states):
253 122 yield e
254 123
255 124 def lookup(self, states, path, visited):
256 125 """
257 126 yield root-relative filenames that match path, and whose
258 127 status are in states:
259 128 * if path is a file, yield path
260 129 * if path is a directory, yield directory files
261 130 * if path is not tracked, yield nothing
262 131 """
263 132 if path[-1] == '/':
264 133 path = path[:-1]
265 134
266 135 paths = path.split('/')
267 136
268 137 # we need to check separately for last node
269 138 last = paths.pop()
270 139
271 140 tree = self
272 141 try:
273 142 for dir in paths:
274 143 tree = tree.dirs[dir]
275 144 except KeyError:
276 145 # path is not tracked
277 146 visited.add(tree.path)
278 147 return
279 148
280 149 try:
281 150 # if path is a directory, walk it
282 151 target = tree.dirs[last]
283 152 visited.add(target.path)
284 153 for file, st in target.walk(states, visited):
285 154 yield file
286 155 except KeyError:
287 156 try:
288 157 if tree.files[last] in states:
289 158 # path is a file
290 159 visited.add(tree.path)
291 160 yield path
292 161 except KeyError:
293 162 # path is not tracked
294 163 pass
295 164
296 class repowatcher(pollable):
165 class repowatcher(object):
297 166 """
298 167 Watches inotify events
299 168 """
300 169 statuskeys = 'almr!?'
301 mask = (
302 inotify.IN_ATTRIB |
303 inotify.IN_CREATE |
304 inotify.IN_DELETE |
305 inotify.IN_DELETE_SELF |
306 inotify.IN_MODIFY |
307 inotify.IN_MOVED_FROM |
308 inotify.IN_MOVED_TO |
309 inotify.IN_MOVE_SELF |
310 inotify.IN_ONLYDIR |
311 inotify.IN_UNMOUNT |
312 0)
313 170
314 171 def __init__(self, ui, dirstate, root):
315 172 self.ui = ui
316 173 self.dirstate = dirstate
317 174
318 175 self.wprefix = join(root, '')
319 176 self.prefixlen = len(self.wprefix)
320 try:
321 self.watcher = watcher.watcher()
322 except OSError, err:
323 raise util.Abort(_('inotify service not available: %s') %
324 err.strerror)
325 self.threshold = watcher.threshold(self.watcher)
326 self.fileno = self.watcher.fileno
327 177
328 178 self.tree = directory()
329 179 self.statcache = {}
330 180 self.statustrees = dict([(s, directory()) for s in self.statuskeys])
331 181
182 self.ds_info = self.dirstate_info()
183
332 184 self.last_event = None
333 185
334 self.lastevent = {}
335 186
336 self.register(timeout=None)
337
338 self.ds_info = self.dirstate_info()
339 self.handle_timeout()
340 self.scan()
341
342 def event_time(self):
343 last = self.last_event
344 now = time.time()
345 self.last_event = now
346
347 if last is None:
348 return 'start'
349 delta = now - last
350 if delta < 5:
351 return '+%.3f' % delta
352 if delta < 50:
353 return '+%.2f' % delta
354 return '+%.1f' % delta
187 def handle_timeout(self):
188 pass
355 189
356 190 def dirstate_info(self):
357 191 try:
358 192 st = os.lstat(self.wprefix + '.hg/dirstate')
359 193 return st.st_mtime, st.st_ino
360 194 except OSError, err:
361 195 if err.errno != errno.ENOENT:
362 196 raise
363 197 return 0, 0
364 198
365 def add_watch(self, path, mask):
366 if not path:
367 return
368 if self.watcher.path(path) is None:
369 if self.ui.debugflag:
370 self.ui.note(_('watching %r\n') % path[self.prefixlen:])
371 try:
372 self.watcher.add(path, mask)
373 except OSError, err:
374 if err.errno in (errno.ENOENT, errno.ENOTDIR):
375 return
376 if err.errno != errno.ENOSPC:
377 raise
378 _explain_watch_limit(self.ui, self.dirstate, self.wprefix)
379
380 def setup(self):
381 self.ui.note(_('watching directories under %r\n') % self.wprefix)
382 self.add_watch(self.wprefix + '.hg', inotify.IN_DELETE)
383 self.check_dirstate()
384
385 199 def filestatus(self, fn, st):
386 200 try:
387 201 type_, mode, size, time = self.dirstate._map[fn][:4]
388 202 except KeyError:
389 203 type_ = '?'
390 204 if type_ == 'n':
391 205 st_mode, st_size, st_mtime = st
392 206 if size == -1:
393 207 return 'l'
394 208 if size and (size != st_size or (mode ^ st_mode) & 0100):
395 209 return 'm'
396 210 if time != int(st_mtime):
397 211 return 'l'
398 212 return 'n'
399 213 if type_ == '?' and self.dirstate._ignore(fn):
400 214 return 'i'
401 215 return type_
402 216
403 217 def updatefile(self, wfn, osstat):
404 218 '''
405 219 update the file entry of an existing file.
406 220
407 221 osstat: (mode, size, time) tuple, as returned by os.lstat(wfn)
408 222 '''
409 223
410 224 self._updatestatus(wfn, self.filestatus(wfn, osstat))
411 225
412 226 def deletefile(self, wfn, oldstatus):
413 227 '''
414 228 update the entry of a file which has been deleted.
415 229
416 230 oldstatus: char in statuskeys, status of the file before deletion
417 231 '''
418 232 if oldstatus == 'r':
419 233 newstatus = 'r'
420 234 elif oldstatus in 'almn':
421 235 newstatus = '!'
422 236 else:
423 237 newstatus = None
424 238
425 239 self.statcache.pop(wfn, None)
426 240 self._updatestatus(wfn, newstatus)
427 241
428 242 def _updatestatus(self, wfn, newstatus):
429 243 '''
430 244 Update the stored status of a file.
431 245
432 246 newstatus: - char in (statuskeys + 'ni'), new status to apply.
433 247 - or None, to stop tracking wfn
434 248 '''
435 249 root, fn = split(wfn)
436 250 d = self.tree.dir(root)
437 251
438 252 oldstatus = d.files.get(fn)
439 253 # oldstatus can be either:
440 254 # - None : fn is new
441 255 # - a char in statuskeys: fn is a (tracked) file
442 256
443 257 if self.ui.debugflag and oldstatus != newstatus:
444 258 self.ui.note(_('status: %r %s -> %s\n') %
445 259 (wfn, oldstatus, newstatus))
446 260
447 261 if oldstatus and oldstatus in self.statuskeys \
448 262 and oldstatus != newstatus:
449 263 del self.statustrees[oldstatus].dir(root).files[fn]
450 264
451 265 if newstatus in (None, 'i'):
452 266 d.files.pop(fn, None)
453 267 elif oldstatus != newstatus:
454 268 d.files[fn] = newstatus
455 269 if newstatus != 'n':
456 270 self.statustrees[newstatus].dir(root).files[fn] = newstatus
457 271
458
459 272 def check_deleted(self, key):
460 273 # Files that had been deleted but were present in the dirstate
461 274 # may have vanished from the dirstate; we must clean them up.
462 275 nuke = []
463 276 for wfn, ignore in self.statustrees[key].walk(key):
464 277 if wfn not in self.dirstate:
465 278 nuke.append(wfn)
466 279 for wfn in nuke:
467 280 root, fn = split(wfn)
468 281 del self.statustrees[key].dir(root).files[fn]
469 282 del self.tree.dir(root).files[fn]
470 283
471 def scan(self, topdir=''):
472 ds = self.dirstate._map.copy()
473 self.add_watch(join(self.wprefix, topdir), self.mask)
474 for root, dirs, files in walk(self.dirstate, self.wprefix, topdir):
475 for d in dirs:
476 self.add_watch(join(root, d), self.mask)
477 wroot = root[self.prefixlen:]
478 for fn in files:
479 wfn = join(wroot, fn)
480 self.updatefile(wfn, self.getstat(wfn))
481 ds.pop(wfn, None)
482 wtopdir = topdir
483 if wtopdir and wtopdir[-1] != '/':
484 wtopdir += '/'
485 for wfn, state in ds.iteritems():
486 if not wfn.startswith(wtopdir):
487 continue
488 try:
489 st = self.stat(wfn)
490 except OSError:
491 status = state[0]
492 self.deletefile(wfn, status)
493 else:
494 self.updatefile(wfn, st)
495 self.check_deleted('!')
496 self.check_deleted('r')
497
498 284 def check_dirstate(self):
499 285 ds_info = self.dirstate_info()
500 286 if ds_info == self.ds_info:
501 287 return
502 288 self.ds_info = ds_info
503 289 if not self.ui.debugflag:
504 290 self.last_event = None
505 self.ui.note(_('%s dirstate reload\n') % self.event_time())
506 291 self.dirstate.invalidate()
507 292 self.handle_timeout()
508 293 self.scan()
509 self.ui.note(_('%s end dirstate reload\n') % self.event_time())
510 294
511 295 def update_hgignore(self):
512 296 # An update of the ignore file can potentially change the
513 297 # states of all unknown and ignored files.
514 298
515 299 # XXX If the user has other ignore files outside the repo, or
516 300 # changes their list of ignore files at run time, we'll
517 301 # potentially never see changes to them. We could get the
518 302 # client to report to us what ignore data they're using.
519 303 # But it's easier to do nothing than to open that can of
520 304 # worms.
521 305
522 306 if '_ignore' in self.dirstate.__dict__:
523 307 delattr(self.dirstate, '_ignore')
524 308 self.ui.note(_('rescanning due to .hgignore change\n'))
525 309 self.handle_timeout()
526 310 self.scan()
527 311
528 312 def getstat(self, wpath):
529 313 try:
530 314 return self.statcache[wpath]
531 315 except KeyError:
532 316 try:
533 317 return self.stat(wpath)
534 318 except OSError, err:
535 319 if err.errno != errno.ENOENT:
536 320 raise
537 321
538 322 def stat(self, wpath):
539 323 try:
540 324 st = os.lstat(join(self.wprefix, wpath))
541 325 ret = st.st_mode, st.st_size, st.st_mtime
542 326 self.statcache[wpath] = ret
543 327 return ret
544 328 except OSError:
545 329 self.statcache.pop(wpath, None)
546 330 raise
547 331
548 @eventaction('c')
549 def created(self, wpath):
550 if wpath == '.hgignore':
551 self.update_hgignore()
552 try:
553 st = self.stat(wpath)
554 if stat.S_ISREG(st[0]):
555 self.updatefile(wpath, st)
556 except OSError:
557 pass
558
559 @eventaction('m')
560 def modified(self, wpath):
561 if wpath == '.hgignore':
562 self.update_hgignore()
563 try:
564 st = self.stat(wpath)
565 if stat.S_ISREG(st[0]):
566 if self.dirstate[wpath] in 'lmn':
567 self.updatefile(wpath, st)
568 except OSError:
569 pass
570
571 @eventaction('d')
572 def deleted(self, wpath):
573 if wpath == '.hgignore':
574 self.update_hgignore()
575 elif wpath.startswith('.hg/'):
576 if wpath == '.hg/wlock':
577 self.check_dirstate()
578 return
579
580 self.deletefile(wpath, self.dirstate[wpath])
581
582 def process_create(self, wpath, evt):
583 if self.ui.debugflag:
584 self.ui.note(_('%s event: created %s\n') %
585 (self.event_time(), wpath))
586
587 if evt.mask & inotify.IN_ISDIR:
588 self.scan(wpath)
589 else:
590 self.created(wpath)
591
592 def process_delete(self, wpath, evt):
593 if self.ui.debugflag:
594 self.ui.note(_('%s event: deleted %s\n') %
595 (self.event_time(), wpath))
596
597 if evt.mask & inotify.IN_ISDIR:
598 tree = self.tree.dir(wpath)
599 todelete = [wfn for wfn, ignore in tree.walk('?')]
600 for fn in todelete:
601 self.deletefile(fn, '?')
602 self.scan(wpath)
603 else:
604 self.deleted(wpath)
605
606 def process_modify(self, wpath, evt):
607 if self.ui.debugflag:
608 self.ui.note(_('%s event: modified %s\n') %
609 (self.event_time(), wpath))
610
611 if not (evt.mask & inotify.IN_ISDIR):
612 self.modified(wpath)
613
614 def process_unmount(self, evt):
615 self.ui.warn(_('filesystem containing %s was unmounted\n') %
616 evt.fullpath)
617 sys.exit(0)
618
619 def handle_pollevents(self, events):
620 if self.ui.debugflag:
621 self.ui.note(_('%s readable: %d bytes\n') %
622 (self.event_time(), self.threshold.readable()))
623 if not self.threshold():
624 if self.registered:
625 if self.ui.debugflag:
626 self.ui.note(_('%s below threshold - unhooking\n') %
627 (self.event_time()))
628 self.unregister()
629 self.timeout = 250
630 else:
631 self.read_events()
632
633 def read_events(self, bufsize=None):
634 events = self.watcher.read(bufsize)
635 if self.ui.debugflag:
636 self.ui.note(_('%s reading %d events\n') %
637 (self.event_time(), len(events)))
638 for evt in events:
639 assert evt.fullpath.startswith(self.wprefix)
640 wpath = evt.fullpath[self.prefixlen:]
641
642 # paths have been normalized, wpath never ends with a '/'
643
644 if wpath.startswith('.hg/') and evt.mask & inotify.IN_ISDIR:
645 # ignore subdirectories of .hg/ (merge, patches...)
646 continue
647
648 if evt.mask & inotify.IN_UNMOUNT:
649 self.process_unmount(wpath, evt)
650 elif evt.mask & (inotify.IN_MODIFY | inotify.IN_ATTRIB):
651 self.process_modify(wpath, evt)
652 elif evt.mask & (inotify.IN_DELETE | inotify.IN_DELETE_SELF |
653 inotify.IN_MOVED_FROM):
654 self.process_delete(wpath, evt)
655 elif evt.mask & (inotify.IN_CREATE | inotify.IN_MOVED_TO):
656 self.process_create(wpath, evt)
657
658 self.lastevent.clear()
659
660 def handle_timeout(self):
661 if not self.registered:
662 if self.ui.debugflag:
663 self.ui.note(_('%s hooking back up with %d bytes readable\n') %
664 (self.event_time(), self.threshold.readable()))
665 self.read_events(0)
666 self.register(timeout=None)
667
668 self.timeout = None
669
670 def shutdown(self):
671 self.watcher.close()
672
673 def debug(self):
674 """
675 Returns a sorted list of relatives paths currently watched,
676 for debugging purposes.
677 """
678 return sorted(tuple[0][self.prefixlen:] for tuple in self.watcher)
679
680 class server(pollable):
332 class socketlistener(object):
681 333 """
682 334 Listens for client queries on unix socket inotify.sock
683 335 """
684 336 def __init__(self, ui, root, repowatcher, timeout):
685 337 self.ui = ui
686 338 self.repowatcher = repowatcher
687 339 self.sock = socket.socket(socket.AF_UNIX)
688 340 self.sockpath = join(root, '.hg/inotify.sock')
689 341 self.realsockpath = None
690 342 try:
691 343 self.sock.bind(self.sockpath)
692 344 except socket.error, err:
693 345 if err[0] == errno.EADDRINUSE:
694 346 raise AlreadyStartedException( _('cannot start: socket is '
695 347 'already bound'))
696 348 if err[0] == "AF_UNIX path too long":
697 349 if os.path.islink(self.sockpath) and \
698 350 not os.path.exists(self.sockpath):
699 351 raise util.Abort('inotify-server: cannot start: '
700 352 '.hg/inotify.sock is a broken symlink')
701 353 tempdir = tempfile.mkdtemp(prefix="hg-inotify-")
702 354 self.realsockpath = os.path.join(tempdir, "inotify.sock")
703 355 try:
704 356 self.sock.bind(self.realsockpath)
705 357 os.symlink(self.realsockpath, self.sockpath)
706 358 except (OSError, socket.error), inst:
707 359 try:
708 360 os.unlink(self.realsockpath)
709 361 except:
710 362 pass
711 363 os.rmdir(tempdir)
712 364 if inst.errno == errno.EEXIST:
713 365 raise AlreadyStartedException(_('cannot start: tried '
714 366 'linking .hg/inotify.sock to a temporary socket but'
715 367 ' .hg/inotify.sock already exists'))
716 368 raise
717 369 else:
718 370 raise
719 371 self.sock.listen(5)
720 372 self.fileno = self.sock.fileno
721 self.register(timeout=timeout)
722
723 def handle_timeout(self):
724 pass
725 373
726 374 def answer_stat_query(self, cs):
727 375 names = cs.read().split('\0')
728 376
729 377 states = names.pop()
730 378
731 379 self.ui.note(_('answering query for %r\n') % states)
732 380
733 if self.repowatcher.timeout:
734 # We got a query while a rescan is pending. Make sure we
735 # rescan before responding, or we could give back a wrong
736 # answer.
737 self.repowatcher.handle_timeout()
738
739 381 visited = set()
740 382 if not names:
741 383 def genresult(states, tree):
742 384 for fn, state in tree.walk(states):
743 385 yield fn
744 386 else:
745 387 def genresult(states, tree):
746 388 for fn in names:
747 389 for f in tree.lookup(states, fn, visited):
748 390 yield f
749 391
750 392 return ['\0'.join(r) for r in [
751 393 genresult('l', self.repowatcher.statustrees['l']),
752 394 genresult('m', self.repowatcher.statustrees['m']),
753 395 genresult('a', self.repowatcher.statustrees['a']),
754 396 genresult('r', self.repowatcher.statustrees['r']),
755 397 genresult('!', self.repowatcher.statustrees['!']),
756 398 '?' in states
757 399 and genresult('?', self.repowatcher.statustrees['?'])
758 400 or [],
759 401 [],
760 402 'c' in states and genresult('n', self.repowatcher.tree) or [],
761 403 visited
762 404 ]]
763 405
764 406 def answer_dbug_query(self):
765 407 return ['\0'.join(self.repowatcher.debug())]
766 408
767 def handle_pollevents(self, events):
768 for e in events:
769 self.handle_pollevent()
770
771 def handle_pollevent(self):
409 def accept_connection(self):
772 410 sock, addr = self.sock.accept()
773 411
774 412 cs = common.recvcs(sock)
775 413 version = ord(cs.read(1))
776 414
777 415 if version != common.version:
778 416 self.ui.warn(_('received query from incompatible client '
779 417 'version %d\n') % version)
780 418 try:
781 419 # try to send back our version to the client
782 420 # this way, the client too is informed of the mismatch
783 421 sock.sendall(chr(common.version))
784 422 except:
785 423 pass
786 424 return
787 425
788 426 type = cs.read(4)
789 427
790 428 if type == 'STAT':
791 429 results = self.answer_stat_query(cs)
792 430 elif type == 'DBUG':
793 431 results = self.answer_dbug_query()
794 432 else:
795 433 self.ui.warn(_('unrecognized query type: %s\n') % type)
796 434 return
797 435
798 436 try:
799 437 try:
800 438 v = chr(common.version)
801 439
802 440 sock.sendall(v + type + struct.pack(common.resphdrfmts[type],
803 441 *map(len, results)))
804 442 sock.sendall(''.join(results))
805 443 finally:
806 444 sock.shutdown(socket.SHUT_WR)
807 445 except socket.error, err:
808 446 if err[0] != errno.EPIPE:
809 447 raise
810 448
811 def shutdown(self):
812 self.sock.close()
813 try:
814 os.unlink(self.sockpath)
815 if self.realsockpath:
816 os.unlink(self.realsockpath)
817 os.rmdir(os.path.dirname(self.realsockpath))
818 except OSError, err:
819 if err.errno != errno.ENOENT:
820 raise
449 if sys.platform == 'linux2':
450 import linuxserver as _server
451 else:
452 raise ImportError
821 453
822 class master(object):
823 def __init__(self, ui, dirstate, root, timeout=None):
824 self.ui = ui
825 self.repowatcher = repowatcher(ui, dirstate, root)
826 self.server = server(ui, root, self.repowatcher, timeout)
827
828 def shutdown(self):
829 for obj in pollable.instances.itervalues():
830 obj.shutdown()
831
832 def run(self):
833 self.repowatcher.setup()
834 self.ui.note(_('finished setup\n'))
835 if os.getenv('TIME_STARTUP'):
836 sys.exit(0)
837 pollable.run()
454 master = _server.master
838 455
839 456 def start(ui, dirstate, root, opts):
840 457 timeout = opts.get('timeout')
841 458 if timeout:
842 459 timeout = float(timeout) * 1e3
843 460
844 461 class service(object):
845 462 def init(self):
846 463 try:
847 464 self.master = master(ui, dirstate, root, timeout)
848 465 except AlreadyStartedException, inst:
849 466 raise util.Abort("inotify-server: %s" % inst)
850 467
851 468 def run(self):
852 469 try:
853 470 self.master.run()
854 471 finally:
855 472 self.master.shutdown()
856 473
857 474 if 'inserve' not in sys.argv:
858 475 runargs = [sys.argv[0], 'inserve', '-R', root]
859 476 else:
860 477 runargs = sys.argv[:]
861 478
862 479 pidfile = ui.config('inotify', 'pidfile')
863 480 if opts['daemon'] and pidfile is not None and 'pid-file' not in runargs:
864 481 runargs.append("--pid-file=%s" % pidfile)
865 482
866 483 service = service()
867 484 logfile = ui.config('inotify', 'log')
868 485 cmdutil.service(opts, initfn=service.init, runfn=service.run,
869 486 logfile=logfile, runargs=runargs)
General Comments 0
You need to be logged in to leave comments. Login now