##// END OF EJS Templates
chgserver: pass hashstate and base server address to chgcmdserver...
Jun Wu -
r28328:e00e57d8 default
parent child Browse files
Show More
@@ -1,629 +1,634
1 1 # chgserver.py - command server extension for cHg
2 2 #
3 3 # Copyright 2011 Yuya Nishihara <yuya@tcha.org>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """command server extension for cHg (EXPERIMENTAL)
9 9
10 10 'S' channel (read/write)
11 11 propagate ui.system() request to client
12 12
13 13 'attachio' command
14 14 attach client's stdio passed by sendmsg()
15 15
16 16 'chdir' command
17 17 change current directory
18 18
19 19 'getpager' command
20 20 checks if pager is enabled and which pager should be executed
21 21
22 22 'setenv' command
23 23 replace os.environ completely
24 24
25 25 'setumask' command
26 26 set umask
27 27
28 28 'SIGHUP' signal
29 29 reload configuration files
30 30
31 31 Config
32 32 ------
33 33
34 34 ::
35 35
36 36 [chgserver]
37 37 idletimeout = 3600 # seconds, after which an idle server will exit
38 38 skiphash = False # whether to skip config or env change checks
39 39 """
40 40
41 41 from __future__ import absolute_import
42 42
43 43 import SocketServer
44 44 import errno
45 45 import inspect
46 46 import os
47 47 import re
48 48 import signal
49 49 import struct
50 50 import sys
51 51 import threading
52 52 import time
53 53 import traceback
54 54
55 55 from mercurial.i18n import _
56 56
57 57 from mercurial import (
58 58 cmdutil,
59 59 commands,
60 60 commandserver,
61 61 dispatch,
62 62 error,
63 63 extensions,
64 64 osutil,
65 65 util,
66 66 )
67 67
68 68 # Note for extension authors: ONLY specify testedwith = 'internal' for
69 69 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
70 70 # be specifying the version(s) of Mercurial they are tested with, or
71 71 # leave the attribute unspecified.
72 72 testedwith = 'internal'
73 73
74 74 _log = commandserver.log
75 75
76 76 def _hashlist(items):
77 77 """return sha1 hexdigest for a list"""
78 78 return util.sha1(str(items)).hexdigest()
79 79
80 80 # sensitive config sections affecting confighash
81 81 _configsections = ['extensions']
82 82
83 83 # sensitive environment variables affecting confighash
84 84 _envre = re.compile(r'''\A(?:
85 85 CHGHG
86 86 |HG.*
87 87 |LANG(?:UAGE)?
88 88 |LC_.*
89 89 |LD_.*
90 90 |PATH
91 91 |PYTHON.*
92 92 |TERM(?:INFO)?
93 93 |TZ
94 94 )\Z''', re.X)
95 95
96 96 def _confighash(ui):
97 97 """return a quick hash for detecting config/env changes
98 98
99 99 confighash is the hash of sensitive config items and environment variables.
100 100
101 101 for chgserver, it is designed that once confighash changes, the server is
102 102 not qualified to serve its client and should redirect the client to a new
103 103 server. different from mtimehash, confighash change will not mark the
104 104 server outdated and exit since the user can have different configs at the
105 105 same time.
106 106 """
107 107 sectionitems = []
108 108 for section in _configsections:
109 109 sectionitems.append(ui.configitems(section))
110 110 sectionhash = _hashlist(sectionitems)
111 111 envitems = [(k, v) for k, v in os.environ.iteritems() if _envre.match(k)]
112 112 envhash = _hashlist(sorted(envitems))
113 113 return sectionhash[:6] + envhash[:6]
114 114
115 115 def _getmtimepaths(ui):
116 116 """get a list of paths that should be checked to detect change
117 117
118 118 The list will include:
119 119 - extensions (will not cover all files for complex extensions)
120 120 - mercurial/__version__.py
121 121 - python binary
122 122 """
123 123 modules = [m for n, m in extensions.extensions(ui)]
124 124 try:
125 125 from mercurial import __version__
126 126 modules.append(__version__)
127 127 except ImportError:
128 128 pass
129 129 files = [sys.executable]
130 130 for m in modules:
131 131 try:
132 132 files.append(inspect.getabsfile(m))
133 133 except TypeError:
134 134 pass
135 135 return sorted(set(files))
136 136
137 137 def _mtimehash(paths):
138 138 """return a quick hash for detecting file changes
139 139
140 140 mtimehash calls stat on given paths and calculate a hash based on size and
141 141 mtime of each file. mtimehash does not read file content because reading is
142 142 expensive. therefore it's not 100% reliable for detecting content changes.
143 143 it's possible to return different hashes for same file contents.
144 144 it's also possible to return a same hash for different file contents for
145 145 some carefully crafted situation.
146 146
147 147 for chgserver, it is designed that once mtimehash changes, the server is
148 148 considered outdated immediately and should no longer provide service.
149 149 """
150 150 def trystat(path):
151 151 try:
152 152 st = os.stat(path)
153 153 return (st.st_mtime, st.st_size)
154 154 except OSError:
155 155 # could be ENOENT, EPERM etc. not fatal in any case
156 156 pass
157 157 return _hashlist(map(trystat, paths))[:12]
158 158
159 159 class hashstate(object):
160 160 """a structure storing confighash, mtimehash, paths used for mtimehash"""
161 161 def __init__(self, confighash, mtimehash, mtimepaths):
162 162 self.confighash = confighash
163 163 self.mtimehash = mtimehash
164 164 self.mtimepaths = mtimepaths
165 165
166 166 @staticmethod
167 167 def fromui(ui, mtimepaths=None):
168 168 if mtimepaths is None:
169 169 mtimepaths = _getmtimepaths(ui)
170 170 confighash = _confighash(ui)
171 171 mtimehash = _mtimehash(mtimepaths)
172 172 _log('confighash = %s mtimehash = %s\n' % (confighash, mtimehash))
173 173 return hashstate(confighash, mtimehash, mtimepaths)
174 174
175 175 # copied from hgext/pager.py:uisetup()
176 176 def _setuppagercmd(ui, options, cmd):
177 177 if not ui.formatted():
178 178 return
179 179
180 180 p = ui.config("pager", "pager", os.environ.get("PAGER"))
181 181 usepager = False
182 182 always = util.parsebool(options['pager'])
183 183 auto = options['pager'] == 'auto'
184 184
185 185 if not p:
186 186 pass
187 187 elif always:
188 188 usepager = True
189 189 elif not auto:
190 190 usepager = False
191 191 else:
192 192 attended = ['annotate', 'cat', 'diff', 'export', 'glog', 'log', 'qdiff']
193 193 attend = ui.configlist('pager', 'attend', attended)
194 194 ignore = ui.configlist('pager', 'ignore')
195 195 cmds, _ = cmdutil.findcmd(cmd, commands.table)
196 196
197 197 for cmd in cmds:
198 198 var = 'attend-%s' % cmd
199 199 if ui.config('pager', var):
200 200 usepager = ui.configbool('pager', var)
201 201 break
202 202 if (cmd in attend or
203 203 (cmd not in ignore and not attend)):
204 204 usepager = True
205 205 break
206 206
207 207 if usepager:
208 208 ui.setconfig('ui', 'formatted', ui.formatted(), 'pager')
209 209 ui.setconfig('ui', 'interactive', False, 'pager')
210 210 return p
211 211
212 212 _envvarre = re.compile(r'\$[a-zA-Z_]+')
213 213
214 214 def _clearenvaliases(cmdtable):
215 215 """Remove stale command aliases referencing env vars; variable expansion
216 216 is done at dispatch.addaliases()"""
217 217 for name, tab in cmdtable.items():
218 218 cmddef = tab[0]
219 219 if (isinstance(cmddef, dispatch.cmdalias) and
220 220 not cmddef.definition.startswith('!') and # shell alias
221 221 _envvarre.search(cmddef.definition)):
222 222 del cmdtable[name]
223 223
224 224 def _newchgui(srcui, csystem):
225 225 class chgui(srcui.__class__):
226 226 def __init__(self, src=None):
227 227 super(chgui, self).__init__(src)
228 228 if src:
229 229 self._csystem = getattr(src, '_csystem', csystem)
230 230 else:
231 231 self._csystem = csystem
232 232
233 233 def system(self, cmd, environ=None, cwd=None, onerr=None,
234 234 errprefix=None):
235 235 # copied from mercurial/util.py:system()
236 236 self.flush()
237 237 def py2shell(val):
238 238 if val is None or val is False:
239 239 return '0'
240 240 if val is True:
241 241 return '1'
242 242 return str(val)
243 243 env = os.environ.copy()
244 244 if environ:
245 245 env.update((k, py2shell(v)) for k, v in environ.iteritems())
246 246 env['HG'] = util.hgexecutable()
247 247 rc = self._csystem(cmd, env, cwd)
248 248 if rc and onerr:
249 249 errmsg = '%s %s' % (os.path.basename(cmd.split(None, 1)[0]),
250 250 util.explainexit(rc)[0])
251 251 if errprefix:
252 252 errmsg = '%s: %s' % (errprefix, errmsg)
253 253 raise onerr(errmsg)
254 254 return rc
255 255
256 256 return chgui(srcui)
257 257
258 258 def _renewui(srcui, args=None):
259 259 if not args:
260 260 args = []
261 261
262 262 newui = srcui.__class__()
263 263 for a in ['fin', 'fout', 'ferr', 'environ']:
264 264 setattr(newui, a, getattr(srcui, a))
265 265 if util.safehasattr(srcui, '_csystem'):
266 266 newui._csystem = srcui._csystem
267 267
268 268 # load wd and repo config, copied from dispatch.py
269 269 cwds = dispatch._earlygetopt(['--cwd'], args)
270 270 cwd = cwds and os.path.realpath(cwds[-1]) or None
271 271 rpath = dispatch._earlygetopt(["-R", "--repository", "--repo"], args)
272 272 path, newui = dispatch._getlocal(newui, rpath, wd=cwd)
273 273
274 274 # internal config: extensions.chgserver
275 275 # copy it. it can only be overrided from command line.
276 276 newui.setconfig('extensions', 'chgserver',
277 277 srcui.config('extensions', 'chgserver'), '--config')
278 278
279 279 # command line args
280 280 dispatch._parseconfig(newui, dispatch._earlygetopt(['--config'], args))
281 281
282 282 # stolen from tortoisehg.util.copydynamicconfig()
283 283 for section, name, value in srcui.walkconfig():
284 284 source = srcui.configsource(section, name)
285 285 if ':' in source or source == '--config':
286 286 # path:line or command line
287 287 continue
288 288 if source == 'none':
289 289 # ui.configsource returns 'none' by default
290 290 source = ''
291 291 newui.setconfig(section, name, value, source)
292 292 return newui
293 293
294 294 class channeledsystem(object):
295 295 """Propagate ui.system() request in the following format:
296 296
297 297 payload length (unsigned int),
298 298 cmd, '\0',
299 299 cwd, '\0',
300 300 envkey, '=', val, '\0',
301 301 ...
302 302 envkey, '=', val
303 303
304 304 and waits:
305 305
306 306 exitcode length (unsigned int),
307 307 exitcode (int)
308 308 """
309 309 def __init__(self, in_, out, channel):
310 310 self.in_ = in_
311 311 self.out = out
312 312 self.channel = channel
313 313
314 314 def __call__(self, cmd, environ, cwd):
315 315 args = [util.quotecommand(cmd), cwd or '.']
316 316 args.extend('%s=%s' % (k, v) for k, v in environ.iteritems())
317 317 data = '\0'.join(args)
318 318 self.out.write(struct.pack('>cI', self.channel, len(data)))
319 319 self.out.write(data)
320 320 self.out.flush()
321 321
322 322 length = self.in_.read(4)
323 323 length, = struct.unpack('>I', length)
324 324 if length != 4:
325 325 raise error.Abort(_('invalid response'))
326 326 rc, = struct.unpack('>i', self.in_.read(4))
327 327 return rc
328 328
329 329 _iochannels = [
330 330 # server.ch, ui.fp, mode
331 331 ('cin', 'fin', 'rb'),
332 332 ('cout', 'fout', 'wb'),
333 333 ('cerr', 'ferr', 'wb'),
334 334 ]
335 335
336 336 class chgcmdserver(commandserver.server):
337 def __init__(self, ui, repo, fin, fout, sock):
337 def __init__(self, ui, repo, fin, fout, sock, hashstate, baseaddress):
338 338 super(chgcmdserver, self).__init__(
339 339 _newchgui(ui, channeledsystem(fin, fout, 'S')), repo, fin, fout)
340 340 self.clientsock = sock
341 341 self._oldios = [] # original (self.ch, ui.fp, fd) before "attachio"
342 self.hashstate = hashstate
343 self.baseaddress = baseaddress
342 344
343 345 def cleanup(self):
344 346 # dispatch._runcatch() does not flush outputs if exception is not
345 347 # handled by dispatch._dispatch()
346 348 self.ui.flush()
347 349 self._restoreio()
348 350
349 351 def attachio(self):
350 352 """Attach to client's stdio passed via unix domain socket; all
351 353 channels except cresult will no longer be used
352 354 """
353 355 # tell client to sendmsg() with 1-byte payload, which makes it
354 356 # distinctive from "attachio\n" command consumed by client.read()
355 357 self.clientsock.sendall(struct.pack('>cI', 'I', 1))
356 358 clientfds = osutil.recvfds(self.clientsock.fileno())
357 359 _log('received fds: %r\n' % clientfds)
358 360
359 361 ui = self.ui
360 362 ui.flush()
361 363 first = self._saveio()
362 364 for fd, (cn, fn, mode) in zip(clientfds, _iochannels):
363 365 assert fd > 0
364 366 fp = getattr(ui, fn)
365 367 os.dup2(fd, fp.fileno())
366 368 os.close(fd)
367 369 if not first:
368 370 continue
369 371 # reset buffering mode when client is first attached. as we want
370 372 # to see output immediately on pager, the mode stays unchanged
371 373 # when client re-attached. ferr is unchanged because it should
372 374 # be unbuffered no matter if it is a tty or not.
373 375 if fn == 'ferr':
374 376 newfp = fp
375 377 else:
376 378 # make it line buffered explicitly because the default is
377 379 # decided on first write(), where fout could be a pager.
378 380 if fp.isatty():
379 381 bufsize = 1 # line buffered
380 382 else:
381 383 bufsize = -1 # system default
382 384 newfp = os.fdopen(fp.fileno(), mode, bufsize)
383 385 setattr(ui, fn, newfp)
384 386 setattr(self, cn, newfp)
385 387
386 388 self.cresult.write(struct.pack('>i', len(clientfds)))
387 389
388 390 def _saveio(self):
389 391 if self._oldios:
390 392 return False
391 393 ui = self.ui
392 394 for cn, fn, _mode in _iochannels:
393 395 ch = getattr(self, cn)
394 396 fp = getattr(ui, fn)
395 397 fd = os.dup(fp.fileno())
396 398 self._oldios.append((ch, fp, fd))
397 399 return True
398 400
399 401 def _restoreio(self):
400 402 ui = self.ui
401 403 for (ch, fp, fd), (cn, fn, _mode) in zip(self._oldios, _iochannels):
402 404 newfp = getattr(ui, fn)
403 405 # close newfp while it's associated with client; otherwise it
404 406 # would be closed when newfp is deleted
405 407 if newfp is not fp:
406 408 newfp.close()
407 409 # restore original fd: fp is open again
408 410 os.dup2(fd, fp.fileno())
409 411 os.close(fd)
410 412 setattr(self, cn, ch)
411 413 setattr(ui, fn, fp)
412 414 del self._oldios[:]
413 415
414 416 def chdir(self):
415 417 """Change current directory
416 418
417 419 Note that the behavior of --cwd option is bit different from this.
418 420 It does not affect --config parameter.
419 421 """
420 422 path = self._readstr()
421 423 if not path:
422 424 return
423 425 _log('chdir to %r\n' % path)
424 426 os.chdir(path)
425 427
426 428 def setumask(self):
427 429 """Change umask"""
428 430 mask = struct.unpack('>I', self._read(4))[0]
429 431 _log('setumask %r\n' % mask)
430 432 os.umask(mask)
431 433
432 434 def getpager(self):
433 435 """Read cmdargs and write pager command to r-channel if enabled
434 436
435 437 If pager isn't enabled, this writes '\0' because channeledoutput
436 438 does not allow to write empty data.
437 439 """
438 440 args = self._readlist()
439 441 try:
440 442 cmd, _func, args, options, _cmdoptions = dispatch._parse(self.ui,
441 443 args)
442 444 except (error.Abort, error.AmbiguousCommand, error.CommandError,
443 445 error.UnknownCommand):
444 446 cmd = None
445 447 options = {}
446 448 if not cmd or 'pager' not in options:
447 449 self.cresult.write('\0')
448 450 return
449 451
450 452 pagercmd = _setuppagercmd(self.ui, options, cmd)
451 453 if pagercmd:
452 454 self.cresult.write(pagercmd)
453 455 else:
454 456 self.cresult.write('\0')
455 457
456 458 def setenv(self):
457 459 """Clear and update os.environ
458 460
459 461 Note that not all variables can make an effect on the running process.
460 462 """
461 463 l = self._readlist()
462 464 try:
463 465 newenv = dict(s.split('=', 1) for s in l)
464 466 except ValueError:
465 467 raise ValueError('unexpected value in setenv request')
466 468
467 469 diffkeys = set(k for k in set(os.environ.keys() + newenv.keys())
468 470 if os.environ.get(k) != newenv.get(k))
469 471 _log('change env: %r\n' % sorted(diffkeys))
470 472
471 473 os.environ.clear()
472 474 os.environ.update(newenv)
473 475
474 476 if set(['HGPLAIN', 'HGPLAINEXCEPT']) & diffkeys:
475 477 # reload config so that ui.plain() takes effect
476 478 self.ui = _renewui(self.ui)
477 479
478 480 _clearenvaliases(commands.table)
479 481
480 482 capabilities = commandserver.server.capabilities.copy()
481 483 capabilities.update({'attachio': attachio,
482 484 'chdir': chdir,
483 485 'getpager': getpager,
484 486 'setenv': setenv,
485 487 'setumask': setumask})
486 488
487 489 # copied from mercurial/commandserver.py
488 490 class _requesthandler(SocketServer.StreamRequestHandler):
489 491 def handle(self):
490 492 # use a different process group from the master process, making this
491 493 # process pass kernel "is_current_pgrp_orphaned" check so signals like
492 494 # SIGTSTP, SIGTTIN, SIGTTOU are not ignored.
493 495 os.setpgid(0, 0)
494 496 ui = self.server.ui
495 497 repo = self.server.repo
496 sv = chgcmdserver(ui, repo, self.rfile, self.wfile, self.connection)
498 sv = chgcmdserver(ui, repo, self.rfile, self.wfile, self.connection,
499 self.server.hashstate, self.server.baseaddress)
497 500 try:
498 501 try:
499 502 sv.serve()
500 503 # handle exceptions that may be raised by command server. most of
501 504 # known exceptions are caught by dispatch.
502 505 except error.Abort as inst:
503 506 ui.warn(_('abort: %s\n') % inst)
504 507 except IOError as inst:
505 508 if inst.errno != errno.EPIPE:
506 509 raise
507 510 except KeyboardInterrupt:
508 511 pass
509 512 finally:
510 513 sv.cleanup()
511 514 except: # re-raises
512 515 # also write traceback to error channel. otherwise client cannot
513 516 # see it because it is written to server's stderr by default.
514 517 traceback.print_exc(file=sv.cerr)
515 518 raise
516 519
517 520 def _tempaddress(address):
518 521 return '%s.%d.tmp' % (address, os.getpid())
519 522
520 523 def _hashaddress(address, hashstr):
521 524 return '%s-%s' % (address, hashstr)
522 525
523 526 class AutoExitMixIn: # use old-style to comply with SocketServer design
524 527 lastactive = time.time()
525 528 idletimeout = 3600 # default 1 hour
526 529
527 530 def startautoexitthread(self):
528 531 # note: the auto-exit check here is cheap enough to not use a thread,
529 532 # be done in serve_forever. however SocketServer is hook-unfriendly,
530 533 # you simply cannot hook serve_forever without copying a lot of code.
531 534 # besides, serve_forever's docstring suggests using thread.
532 535 thread = threading.Thread(target=self._autoexitloop)
533 536 thread.daemon = True
534 537 thread.start()
535 538
536 539 def _autoexitloop(self, interval=1):
537 540 while True:
538 541 time.sleep(interval)
539 542 if not self.issocketowner():
540 543 _log('%s is not owned, exiting.\n' % self.server_address)
541 544 break
542 545 if time.time() - self.lastactive > self.idletimeout:
543 546 _log('being idle too long. exiting.\n')
544 547 break
545 548 self.shutdown()
546 549
547 550 def process_request(self, request, address):
548 551 self.lastactive = time.time()
549 552 return SocketServer.ForkingMixIn.process_request(
550 553 self, request, address)
551 554
552 555 def server_bind(self):
553 556 # use a unique temp address so we can stat the file and do ownership
554 557 # check later
555 558 tempaddress = _tempaddress(self.server_address)
556 559 self.socket.bind(tempaddress)
557 560 self._socketstat = os.stat(tempaddress)
558 561 # rename will replace the old socket file if exists atomically. the
559 562 # old server will detect ownership change and exit.
560 563 util.rename(tempaddress, self.server_address)
561 564
562 565 def issocketowner(self):
563 566 try:
564 567 stat = os.stat(self.server_address)
565 568 return (stat.st_ino == self._socketstat.st_ino and
566 569 stat.st_mtime == self._socketstat.st_mtime)
567 570 except OSError:
568 571 return False
569 572
570 573 def unlinksocketfile(self):
571 574 if not self.issocketowner():
572 575 return
573 576 # it is possible to have a race condition here that we may
574 577 # remove another server's socket file. but that's okay
575 578 # since that server will detect and exit automatically and
576 579 # the client will start a new server on demand.
577 580 try:
578 581 os.unlink(self.server_address)
579 582 except OSError as exc:
580 583 if exc.errno != errno.ENOENT:
581 584 raise
582 585
583 586 class chgunixservice(commandserver.unixservice):
584 587 def init(self):
585 588 signal.signal(signal.SIGHUP, self._reloadconfig)
586 589 self._inithashstate()
587 590 class cls(AutoExitMixIn, SocketServer.ForkingMixIn,
588 591 SocketServer.UnixStreamServer):
589 592 ui = self.ui
590 593 repo = self.repo
594 hashstate = self.hashstate
595 baseaddress = self.baseaddress
591 596 self.server = cls(self.address, _requesthandler)
592 597 self.server.idletimeout = self.ui.configint(
593 598 'chgserver', 'idletimeout', self.server.idletimeout)
594 599 self.server.startautoexitthread()
595 600 self._createsymlink()
596 601 # avoid writing "listening at" message to stdout before attachio
597 602 # request, which calls setvbuf()
598 603
599 604 def _inithashstate(self):
600 605 self.baseaddress = self.address
601 606 if self.ui.configbool('chgserver', 'skiphash', False):
602 607 self.hashstate = None
603 608 return
604 609 self.hashstate = hashstate.fromui(self.ui)
605 610 self.address = _hashaddress(self.address, self.hashstate.confighash)
606 611
607 612 def _createsymlink(self):
608 613 if self.baseaddress == self.address:
609 614 return
610 615 tempaddress = _tempaddress(self.baseaddress)
611 616 os.symlink(self.address, tempaddress)
612 617 util.rename(tempaddress, self.baseaddress)
613 618
614 619 def _reloadconfig(self, signum, frame):
615 620 self.ui = self.server.ui = _renewui(self.ui)
616 621
617 622 def run(self):
618 623 try:
619 624 self.server.serve_forever()
620 625 finally:
621 626 self.server.unlinksocketfile()
622 627
623 628 def uisetup(ui):
624 629 commandserver._servicemap['chgunix'] = chgunixservice
625 630
626 631 # CHGINTERNALMARK is temporarily set by chg client to detect if chg will
627 632 # start another chg. drop it to avoid possible side effects.
628 633 if 'CHGINTERNALMARK' in os.environ:
629 634 del os.environ['CHGINTERNALMARK']
General Comments 0
You need to be logged in to leave comments. Login now