##// END OF EJS Templates
chgserver: auto exit after being idle for too long or lose the socket file...
Jun Wu -
r28223:0a853dc9 default
parent child Browse files
Show More
@@ -1,399 +1,474 b''
1 1 # chgserver.py - command server extension for cHg
2 2 #
3 3 # Copyright 2011 Yuya Nishihara <yuya@tcha.org>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """command server extension for cHg (EXPERIMENTAL)
9 9
10 10 'S' channel (read/write)
11 11 propagate ui.system() request to client
12 12
13 13 'attachio' command
14 14 attach client's stdio passed by sendmsg()
15 15
16 16 'chdir' command
17 17 change current directory
18 18
19 19 'getpager' command
20 20 checks if pager is enabled and which pager should be executed
21 21
22 22 'setenv' command
23 23 replace os.environ completely
24 24
25 25 'SIGHUP' signal
26 26 reload configuration files
27 27 """
28 28
29 29 from __future__ import absolute_import
30 30
31 31 import SocketServer
32 32 import errno
33 33 import os
34 34 import re
35 35 import signal
36 36 import struct
37 import threading
38 import time
37 39 import traceback
38 40
39 41 from mercurial.i18n import _
40 42
41 43 from mercurial import (
42 44 cmdutil,
43 45 commands,
44 46 commandserver,
45 47 dispatch,
46 48 error,
47 49 osutil,
48 50 util,
49 51 )
50 52
51 53 # Note for extension authors: ONLY specify testedwith = 'internal' for
52 54 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
53 55 # be specifying the version(s) of Mercurial they are tested with, or
54 56 # leave the attribute unspecified.
55 57 testedwith = 'internal'
56 58
57 59 _log = commandserver.log
58 60
59 61 # copied from hgext/pager.py:uisetup()
60 62 def _setuppagercmd(ui, options, cmd):
61 63 if not ui.formatted():
62 64 return
63 65
64 66 p = ui.config("pager", "pager", os.environ.get("PAGER"))
65 67 usepager = False
66 68 always = util.parsebool(options['pager'])
67 69 auto = options['pager'] == 'auto'
68 70
69 71 if not p:
70 72 pass
71 73 elif always:
72 74 usepager = True
73 75 elif not auto:
74 76 usepager = False
75 77 else:
76 78 attended = ['annotate', 'cat', 'diff', 'export', 'glog', 'log', 'qdiff']
77 79 attend = ui.configlist('pager', 'attend', attended)
78 80 ignore = ui.configlist('pager', 'ignore')
79 81 cmds, _ = cmdutil.findcmd(cmd, commands.table)
80 82
81 83 for cmd in cmds:
82 84 var = 'attend-%s' % cmd
83 85 if ui.config('pager', var):
84 86 usepager = ui.configbool('pager', var)
85 87 break
86 88 if (cmd in attend or
87 89 (cmd not in ignore and not attend)):
88 90 usepager = True
89 91 break
90 92
91 93 if usepager:
92 94 ui.setconfig('ui', 'formatted', ui.formatted(), 'pager')
93 95 ui.setconfig('ui', 'interactive', False, 'pager')
94 96 return p
95 97
96 98 _envvarre = re.compile(r'\$[a-zA-Z_]+')
97 99
98 100 def _clearenvaliases(cmdtable):
99 101 """Remove stale command aliases referencing env vars; variable expansion
100 102 is done at dispatch.addaliases()"""
101 103 for name, tab in cmdtable.items():
102 104 cmddef = tab[0]
103 105 if (isinstance(cmddef, dispatch.cmdalias) and
104 106 not cmddef.definition.startswith('!') and # shell alias
105 107 _envvarre.search(cmddef.definition)):
106 108 del cmdtable[name]
107 109
108 110 def _newchgui(srcui, csystem):
109 111 class chgui(srcui.__class__):
110 112 def __init__(self, src=None):
111 113 super(chgui, self).__init__(src)
112 114 if src:
113 115 self._csystem = getattr(src, '_csystem', csystem)
114 116 else:
115 117 self._csystem = csystem
116 118
117 119 def system(self, cmd, environ=None, cwd=None, onerr=None,
118 120 errprefix=None):
119 121 # copied from mercurial/util.py:system()
120 122 self.flush()
121 123 def py2shell(val):
122 124 if val is None or val is False:
123 125 return '0'
124 126 if val is True:
125 127 return '1'
126 128 return str(val)
127 129 env = os.environ.copy()
128 130 if environ:
129 131 env.update((k, py2shell(v)) for k, v in environ.iteritems())
130 132 env['HG'] = util.hgexecutable()
131 133 rc = self._csystem(cmd, env, cwd)
132 134 if rc and onerr:
133 135 errmsg = '%s %s' % (os.path.basename(cmd.split(None, 1)[0]),
134 136 util.explainexit(rc)[0])
135 137 if errprefix:
136 138 errmsg = '%s: %s' % (errprefix, errmsg)
137 139 raise onerr(errmsg)
138 140 return rc
139 141
140 142 return chgui(srcui)
141 143
142 144 def _renewui(srcui):
143 145 newui = srcui.__class__()
144 146 for a in ['fin', 'fout', 'ferr', 'environ']:
145 147 setattr(newui, a, getattr(srcui, a))
146 148 if util.safehasattr(srcui, '_csystem'):
147 149 newui._csystem = srcui._csystem
148 150 # stolen from tortoisehg.util.copydynamicconfig()
149 151 for section, name, value in srcui.walkconfig():
150 152 source = srcui.configsource(section, name)
151 153 if ':' in source:
152 154 # path:line
153 155 continue
154 156 if source == 'none':
155 157 # ui.configsource returns 'none' by default
156 158 source = ''
157 159 newui.setconfig(section, name, value, source)
158 160 return newui
159 161
160 162 class channeledsystem(object):
161 163 """Propagate ui.system() request in the following format:
162 164
163 165 payload length (unsigned int),
164 166 cmd, '\0',
165 167 cwd, '\0',
166 168 envkey, '=', val, '\0',
167 169 ...
168 170 envkey, '=', val
169 171
170 172 and waits:
171 173
172 174 exitcode length (unsigned int),
173 175 exitcode (int)
174 176 """
175 177 def __init__(self, in_, out, channel):
176 178 self.in_ = in_
177 179 self.out = out
178 180 self.channel = channel
179 181
180 182 def __call__(self, cmd, environ, cwd):
181 183 args = [util.quotecommand(cmd), cwd or '.']
182 184 args.extend('%s=%s' % (k, v) for k, v in environ.iteritems())
183 185 data = '\0'.join(args)
184 186 self.out.write(struct.pack('>cI', self.channel, len(data)))
185 187 self.out.write(data)
186 188 self.out.flush()
187 189
188 190 length = self.in_.read(4)
189 191 length, = struct.unpack('>I', length)
190 192 if length != 4:
191 193 raise error.Abort(_('invalid response'))
192 194 rc, = struct.unpack('>i', self.in_.read(4))
193 195 return rc
194 196
195 197 _iochannels = [
196 198 # server.ch, ui.fp, mode
197 199 ('cin', 'fin', 'rb'),
198 200 ('cout', 'fout', 'wb'),
199 201 ('cerr', 'ferr', 'wb'),
200 202 ]
201 203
202 204 class chgcmdserver(commandserver.server):
203 205 def __init__(self, ui, repo, fin, fout, sock):
204 206 super(chgcmdserver, self).__init__(
205 207 _newchgui(ui, channeledsystem(fin, fout, 'S')), repo, fin, fout)
206 208 self.clientsock = sock
207 209 self._oldios = [] # original (self.ch, ui.fp, fd) before "attachio"
208 210
209 211 def cleanup(self):
210 212 # dispatch._runcatch() does not flush outputs if exception is not
211 213 # handled by dispatch._dispatch()
212 214 self.ui.flush()
213 215 self._restoreio()
214 216
215 217 def attachio(self):
216 218 """Attach to client's stdio passed via unix domain socket; all
217 219 channels except cresult will no longer be used
218 220 """
219 221 # tell client to sendmsg() with 1-byte payload, which makes it
220 222 # distinctive from "attachio\n" command consumed by client.read()
221 223 self.clientsock.sendall(struct.pack('>cI', 'I', 1))
222 224 clientfds = osutil.recvfds(self.clientsock.fileno())
223 225 _log('received fds: %r\n' % clientfds)
224 226
225 227 ui = self.ui
226 228 ui.flush()
227 229 first = self._saveio()
228 230 for fd, (cn, fn, mode) in zip(clientfds, _iochannels):
229 231 assert fd > 0
230 232 fp = getattr(ui, fn)
231 233 os.dup2(fd, fp.fileno())
232 234 os.close(fd)
233 235 if not first:
234 236 continue
235 237 # reset buffering mode when client is first attached. as we want
236 238 # to see output immediately on pager, the mode stays unchanged
237 239 # when client re-attached. ferr is unchanged because it should
238 240 # be unbuffered no matter if it is a tty or not.
239 241 if fn == 'ferr':
240 242 newfp = fp
241 243 else:
242 244 # make it line buffered explicitly because the default is
243 245 # decided on first write(), where fout could be a pager.
244 246 if fp.isatty():
245 247 bufsize = 1 # line buffered
246 248 else:
247 249 bufsize = -1 # system default
248 250 newfp = os.fdopen(fp.fileno(), mode, bufsize)
249 251 setattr(ui, fn, newfp)
250 252 setattr(self, cn, newfp)
251 253
252 254 self.cresult.write(struct.pack('>i', len(clientfds)))
253 255
254 256 def _saveio(self):
255 257 if self._oldios:
256 258 return False
257 259 ui = self.ui
258 260 for cn, fn, _mode in _iochannels:
259 261 ch = getattr(self, cn)
260 262 fp = getattr(ui, fn)
261 263 fd = os.dup(fp.fileno())
262 264 self._oldios.append((ch, fp, fd))
263 265 return True
264 266
265 267 def _restoreio(self):
266 268 ui = self.ui
267 269 for (ch, fp, fd), (cn, fn, _mode) in zip(self._oldios, _iochannels):
268 270 newfp = getattr(ui, fn)
269 271 # close newfp while it's associated with client; otherwise it
270 272 # would be closed when newfp is deleted
271 273 if newfp is not fp:
272 274 newfp.close()
273 275 # restore original fd: fp is open again
274 276 os.dup2(fd, fp.fileno())
275 277 os.close(fd)
276 278 setattr(self, cn, ch)
277 279 setattr(ui, fn, fp)
278 280 del self._oldios[:]
279 281
280 282 def chdir(self):
281 283 """Change current directory
282 284
283 285 Note that the behavior of --cwd option is bit different from this.
284 286 It does not affect --config parameter.
285 287 """
286 288 path = self._readstr()
287 289 if not path:
288 290 return
289 291 _log('chdir to %r\n' % path)
290 292 os.chdir(path)
291 293
292 294 def setumask(self):
293 295 """Change umask"""
294 296 mask = struct.unpack('>I', self._read(4))[0]
295 297 _log('setumask %r\n' % mask)
296 298 os.umask(mask)
297 299
298 300 def getpager(self):
299 301 """Read cmdargs and write pager command to r-channel if enabled
300 302
301 303 If pager isn't enabled, this writes '\0' because channeledoutput
302 304 does not allow to write empty data.
303 305 """
304 306 args = self._readlist()
305 307 try:
306 308 cmd, _func, args, options, _cmdoptions = dispatch._parse(self.ui,
307 309 args)
308 310 except (error.Abort, error.AmbiguousCommand, error.CommandError,
309 311 error.UnknownCommand):
310 312 cmd = None
311 313 options = {}
312 314 if not cmd or 'pager' not in options:
313 315 self.cresult.write('\0')
314 316 return
315 317
316 318 pagercmd = _setuppagercmd(self.ui, options, cmd)
317 319 if pagercmd:
318 320 self.cresult.write(pagercmd)
319 321 else:
320 322 self.cresult.write('\0')
321 323
322 324 def setenv(self):
323 325 """Clear and update os.environ
324 326
325 327 Note that not all variables can make an effect on the running process.
326 328 """
327 329 l = self._readlist()
328 330 try:
329 331 newenv = dict(s.split('=', 1) for s in l)
330 332 except ValueError:
331 333 raise ValueError('unexpected value in setenv request')
332 334
333 335 diffkeys = set(k for k in set(os.environ.keys() + newenv.keys())
334 336 if os.environ.get(k) != newenv.get(k))
335 337 _log('change env: %r\n' % sorted(diffkeys))
336 338
337 339 os.environ.clear()
338 340 os.environ.update(newenv)
339 341
340 342 if set(['HGPLAIN', 'HGPLAINEXCEPT']) & diffkeys:
341 343 # reload config so that ui.plain() takes effect
342 344 self.ui = _renewui(self.ui)
343 345
344 346 _clearenvaliases(commands.table)
345 347
346 348 capabilities = commandserver.server.capabilities.copy()
347 349 capabilities.update({'attachio': attachio,
348 350 'chdir': chdir,
349 351 'getpager': getpager,
350 352 'setenv': setenv,
351 353 'setumask': setumask})
352 354
353 355 # copied from mercurial/commandserver.py
354 356 class _requesthandler(SocketServer.StreamRequestHandler):
355 357 def handle(self):
356 358 # use a different process group from the master process, making this
357 359 # process pass kernel "is_current_pgrp_orphaned" check so signals like
358 360 # SIGTSTP, SIGTTIN, SIGTTOU are not ignored.
359 361 os.setpgid(0, 0)
360 362 ui = self.server.ui
361 363 repo = self.server.repo
362 364 sv = chgcmdserver(ui, repo, self.rfile, self.wfile, self.connection)
363 365 try:
364 366 try:
365 367 sv.serve()
366 368 # handle exceptions that may be raised by command server. most of
367 369 # known exceptions are caught by dispatch.
368 370 except error.Abort as inst:
369 371 ui.warn(_('abort: %s\n') % inst)
370 372 except IOError as inst:
371 373 if inst.errno != errno.EPIPE:
372 374 raise
373 375 except KeyboardInterrupt:
374 376 pass
375 377 finally:
376 378 sv.cleanup()
377 379 except: # re-raises
378 380 # also write traceback to error channel. otherwise client cannot
379 381 # see it because it is written to server's stderr by default.
380 382 traceback.print_exc(file=sv.cerr)
381 383 raise
382 384
385 def _tempaddress(address):
386 return '%s.%d.tmp' % (address, os.getpid())
387
388 class AutoExitMixIn: # use old-style to comply with SocketServer design
389 lastactive = time.time()
390 idletimeout = 3600 # default 1 hour
391
392 def startautoexitthread(self):
393 # note: the auto-exit check here is cheap enough to not use a thread,
394 # be done in serve_forever. however SocketServer is hook-unfriendly,
395 # you simply cannot hook serve_forever without copying a lot of code.
396 # besides, serve_forever's docstring suggests using thread.
397 thread = threading.Thread(target=self._autoexitloop)
398 thread.daemon = True
399 thread.start()
400
401 def _autoexitloop(self, interval=1):
402 while True:
403 time.sleep(interval)
404 if not self.issocketowner():
405 _log('%s is not owned, exiting.\n' % self.server_address)
406 break
407 if time.time() - self.lastactive > self.idletimeout:
408 _log('being idle too long. exiting.\n')
409 break
410 self.shutdown()
411
412 def process_request(self, request, address):
413 self.lastactive = time.time()
414 return SocketServer.ForkingMixIn.process_request(
415 self, request, address)
416
417 def server_bind(self):
418 # use a unique temp address so we can stat the file and do ownership
419 # check later
420 tempaddress = _tempaddress(self.server_address)
421 self.socket.bind(tempaddress)
422 self._socketstat = os.stat(tempaddress)
423 # rename will replace the old socket file if exists atomically. the
424 # old server will detect ownership change and exit.
425 util.rename(tempaddress, self.server_address)
426
427 def issocketowner(self):
428 try:
429 stat = os.stat(self.server_address)
430 return (stat.st_ino == self._socketstat.st_ino and
431 stat.st_mtime == self._socketstat.st_mtime)
432 except OSError:
433 return False
434
435 def unlinksocketfile(self):
436 if not self.issocketowner():
437 return
438 # it is possible to have a race condition here that we may
439 # remove another server's socket file. but that's okay
440 # since that server will detect and exit automatically and
441 # the client will start a new server on demand.
442 try:
443 os.unlink(self.server_address)
444 except OSError as exc:
445 if exc.errno != errno.ENOENT:
446 raise
447
383 448 class chgunixservice(commandserver.unixservice):
384 449 def init(self):
385 450 # drop options set for "hg serve --cmdserver" command
386 451 self.ui.setconfig('progress', 'assume-tty', None)
387 452 signal.signal(signal.SIGHUP, self._reloadconfig)
388 class cls(SocketServer.ForkingMixIn, SocketServer.UnixStreamServer):
453 class cls(AutoExitMixIn, SocketServer.ForkingMixIn,
454 SocketServer.UnixStreamServer):
389 455 ui = self.ui
390 456 repo = self.repo
391 457 self.server = cls(self.address, _requesthandler)
458 self.server.idletimeout = self.ui.configint(
459 'chgserver', 'idletimeout', self.server.idletimeout)
460 self.server.startautoexitthread()
392 461 # avoid writing "listening at" message to stdout before attachio
393 462 # request, which calls setvbuf()
394 463
395 464 def _reloadconfig(self, signum, frame):
396 465 self.ui = self.server.ui = _renewui(self.ui)
397 466
467 def run(self):
468 try:
469 self.server.serve_forever()
470 finally:
471 self.server.unlinksocketfile()
472
398 473 def uisetup(ui):
399 474 commandserver._servicemap['chgunix'] = chgunixservice
General Comments 0
You need to be logged in to leave comments. Login now