##// END OF EJS Templates
chgserver: restore pager fds attached within runcommand session...
Yuya Nishihara -
r39775:7cdd47d9 default
parent child Browse files
Show More
@@ -1,600 +1,609
1 1 # chgserver.py - command server extension for cHg
2 2 #
3 3 # Copyright 2011 Yuya Nishihara <yuya@tcha.org>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """command server extension for cHg
9 9
10 10 'S' channel (read/write)
11 11 propagate ui.system() request to client
12 12
13 13 'attachio' command
14 14 attach client's stdio passed by sendmsg()
15 15
16 16 'chdir' command
17 17 change current directory
18 18
19 19 'setenv' command
20 20 replace os.environ completely
21 21
22 22 'setumask' command
23 23 set umask
24 24
25 25 'validate' command
26 26 reload the config and check if the server is up to date
27 27
28 28 Config
29 29 ------
30 30
31 31 ::
32 32
33 33 [chgserver]
34 34 # how long (in seconds) should an idle chg server exit
35 35 idletimeout = 3600
36 36
37 37 # whether to skip config or env change checks
38 38 skiphash = False
39 39 """
40 40
41 41 from __future__ import absolute_import
42 42
43 43 import hashlib
44 44 import inspect
45 45 import os
46 46 import re
47 47 import socket
48 48 import stat
49 49 import struct
50 50 import time
51 51
52 52 from .i18n import _
53 53
54 54 from . import (
55 55 commandserver,
56 56 encoding,
57 57 error,
58 58 extensions,
59 59 node,
60 60 pycompat,
61 61 util,
62 62 )
63 63
64 64 from .utils import (
65 65 procutil,
66 66 )
67 67
68 68 _log = commandserver.log
69 69
70 70 def _hashlist(items):
71 71 """return sha1 hexdigest for a list"""
72 72 return node.hex(hashlib.sha1(str(items)).digest())
73 73
74 74 # sensitive config sections affecting confighash
75 75 _configsections = [
76 76 'alias', # affects global state commands.table
77 77 'eol', # uses setconfig('eol', ...)
78 78 'extdiff', # uisetup will register new commands
79 79 'extensions',
80 80 ]
81 81
82 82 _configsectionitems = [
83 83 ('commands', 'show.aliasprefix'), # show.py reads it in extsetup
84 84 ]
85 85
86 86 # sensitive environment variables affecting confighash
87 87 _envre = re.compile(r'''\A(?:
88 88 CHGHG
89 89 |HG(?:DEMANDIMPORT|EMITWARNINGS|MODULEPOLICY|PROF|RCPATH)?
90 90 |HG(?:ENCODING|PLAIN).*
91 91 |LANG(?:UAGE)?
92 92 |LC_.*
93 93 |LD_.*
94 94 |PATH
95 95 |PYTHON.*
96 96 |TERM(?:INFO)?
97 97 |TZ
98 98 )\Z''', re.X)
99 99
100 100 def _confighash(ui):
101 101 """return a quick hash for detecting config/env changes
102 102
103 103 confighash is the hash of sensitive config items and environment variables.
104 104
105 105 for chgserver, it is designed that once confighash changes, the server is
106 106 not qualified to serve its client and should redirect the client to a new
107 107 server. different from mtimehash, confighash change will not mark the
108 108 server outdated and exit since the user can have different configs at the
109 109 same time.
110 110 """
111 111 sectionitems = []
112 112 for section in _configsections:
113 113 sectionitems.append(ui.configitems(section))
114 114 for section, item in _configsectionitems:
115 115 sectionitems.append(ui.config(section, item))
116 116 sectionhash = _hashlist(sectionitems)
117 117 # If $CHGHG is set, the change to $HG should not trigger a new chg server
118 118 if 'CHGHG' in encoding.environ:
119 119 ignored = {'HG'}
120 120 else:
121 121 ignored = set()
122 122 envitems = [(k, v) for k, v in encoding.environ.iteritems()
123 123 if _envre.match(k) and k not in ignored]
124 124 envhash = _hashlist(sorted(envitems))
125 125 return sectionhash[:6] + envhash[:6]
126 126
127 127 def _getmtimepaths(ui):
128 128 """get a list of paths that should be checked to detect change
129 129
130 130 The list will include:
131 131 - extensions (will not cover all files for complex extensions)
132 132 - mercurial/__version__.py
133 133 - python binary
134 134 """
135 135 modules = [m for n, m in extensions.extensions(ui)]
136 136 try:
137 137 from . import __version__
138 138 modules.append(__version__)
139 139 except ImportError:
140 140 pass
141 141 files = [pycompat.sysexecutable]
142 142 for m in modules:
143 143 try:
144 144 files.append(inspect.getabsfile(m))
145 145 except TypeError:
146 146 pass
147 147 return sorted(set(files))
148 148
149 149 def _mtimehash(paths):
150 150 """return a quick hash for detecting file changes
151 151
152 152 mtimehash calls stat on given paths and calculate a hash based on size and
153 153 mtime of each file. mtimehash does not read file content because reading is
154 154 expensive. therefore it's not 100% reliable for detecting content changes.
155 155 it's possible to return different hashes for same file contents.
156 156 it's also possible to return a same hash for different file contents for
157 157 some carefully crafted situation.
158 158
159 159 for chgserver, it is designed that once mtimehash changes, the server is
160 160 considered outdated immediately and should no longer provide service.
161 161
162 162 mtimehash is not included in confighash because we only know the paths of
163 163 extensions after importing them (there is imp.find_module but that faces
164 164 race conditions). We need to calculate confighash without importing.
165 165 """
166 166 def trystat(path):
167 167 try:
168 168 st = os.stat(path)
169 169 return (st[stat.ST_MTIME], st.st_size)
170 170 except OSError:
171 171 # could be ENOENT, EPERM etc. not fatal in any case
172 172 pass
173 173 return _hashlist(map(trystat, paths))[:12]
174 174
175 175 class hashstate(object):
176 176 """a structure storing confighash, mtimehash, paths used for mtimehash"""
177 177 def __init__(self, confighash, mtimehash, mtimepaths):
178 178 self.confighash = confighash
179 179 self.mtimehash = mtimehash
180 180 self.mtimepaths = mtimepaths
181 181
182 182 @staticmethod
183 183 def fromui(ui, mtimepaths=None):
184 184 if mtimepaths is None:
185 185 mtimepaths = _getmtimepaths(ui)
186 186 confighash = _confighash(ui)
187 187 mtimehash = _mtimehash(mtimepaths)
188 188 _log('confighash = %s mtimehash = %s\n' % (confighash, mtimehash))
189 189 return hashstate(confighash, mtimehash, mtimepaths)
190 190
191 191 def _newchgui(srcui, csystem, attachio):
192 192 class chgui(srcui.__class__):
193 193 def __init__(self, src=None):
194 194 super(chgui, self).__init__(src)
195 195 if src:
196 196 self._csystem = getattr(src, '_csystem', csystem)
197 197 else:
198 198 self._csystem = csystem
199 199
200 200 def _runsystem(self, cmd, environ, cwd, out):
201 201 # fallback to the original system method if the output needs to be
202 202 # captured (to self._buffers), or the output stream is not stdout
203 203 # (e.g. stderr, cStringIO), because the chg client is not aware of
204 204 # these situations and will behave differently (write to stdout).
205 205 if (out is not self.fout
206 206 or not util.safehasattr(self.fout, 'fileno')
207 207 or self.fout.fileno() != procutil.stdout.fileno()):
208 208 return procutil.system(cmd, environ=environ, cwd=cwd, out=out)
209 209 self.flush()
210 210 return self._csystem(cmd, procutil.shellenviron(environ), cwd)
211 211
212 212 def _runpager(self, cmd, env=None):
213 213 self._csystem(cmd, procutil.shellenviron(env), type='pager',
214 214 cmdtable={'attachio': attachio})
215 215 return True
216 216
217 217 return chgui(srcui)
218 218
219 219 def _loadnewui(srcui, args):
220 220 from . import dispatch # avoid cycle
221 221
222 222 newui = srcui.__class__.load()
223 223 for a in ['fin', 'fout', 'ferr', 'environ']:
224 224 setattr(newui, a, getattr(srcui, a))
225 225 if util.safehasattr(srcui, '_csystem'):
226 226 newui._csystem = srcui._csystem
227 227
228 228 # command line args
229 229 options = dispatch._earlyparseopts(newui, args)
230 230 dispatch._parseconfig(newui, options['config'])
231 231
232 232 # stolen from tortoisehg.util.copydynamicconfig()
233 233 for section, name, value in srcui.walkconfig():
234 234 source = srcui.configsource(section, name)
235 235 if ':' in source or source == '--config' or source.startswith('$'):
236 236 # path:line or command line, or environ
237 237 continue
238 238 newui.setconfig(section, name, value, source)
239 239
240 240 # load wd and repo config, copied from dispatch.py
241 241 cwd = options['cwd']
242 242 cwd = cwd and os.path.realpath(cwd) or None
243 243 rpath = options['repository']
244 244 path, newlui = dispatch._getlocal(newui, rpath, wd=cwd)
245 245
246 246 return (newui, newlui)
247 247
248 248 class channeledsystem(object):
249 249 """Propagate ui.system() request in the following format:
250 250
251 251 payload length (unsigned int),
252 252 type, '\0',
253 253 cmd, '\0',
254 254 cwd, '\0',
255 255 envkey, '=', val, '\0',
256 256 ...
257 257 envkey, '=', val
258 258
259 259 if type == 'system', waits for:
260 260
261 261 exitcode length (unsigned int),
262 262 exitcode (int)
263 263
264 264 if type == 'pager', repetitively waits for a command name ending with '\n'
265 265 and executes it defined by cmdtable, or exits the loop if the command name
266 266 is empty.
267 267 """
268 268 def __init__(self, in_, out, channel):
269 269 self.in_ = in_
270 270 self.out = out
271 271 self.channel = channel
272 272
273 273 def __call__(self, cmd, environ, cwd=None, type='system', cmdtable=None):
274 274 args = [type, procutil.quotecommand(cmd), os.path.abspath(cwd or '.')]
275 275 args.extend('%s=%s' % (k, v) for k, v in environ.iteritems())
276 276 data = '\0'.join(args)
277 277 self.out.write(struct.pack('>cI', self.channel, len(data)))
278 278 self.out.write(data)
279 279 self.out.flush()
280 280
281 281 if type == 'system':
282 282 length = self.in_.read(4)
283 283 length, = struct.unpack('>I', length)
284 284 if length != 4:
285 285 raise error.Abort(_('invalid response'))
286 286 rc, = struct.unpack('>i', self.in_.read(4))
287 287 return rc
288 288 elif type == 'pager':
289 289 while True:
290 290 cmd = self.in_.readline()[:-1]
291 291 if not cmd:
292 292 break
293 293 if cmdtable and cmd in cmdtable:
294 294 _log('pager subcommand: %s' % cmd)
295 295 cmdtable[cmd]()
296 296 else:
297 297 raise error.Abort(_('unexpected command: %s') % cmd)
298 298 else:
299 299 raise error.ProgrammingError('invalid S channel type: %s' % type)
300 300
301 301 _iochannels = [
302 302 # server.ch, ui.fp, mode
303 303 ('cin', 'fin', r'rb'),
304 304 ('cout', 'fout', r'wb'),
305 305 ('cerr', 'ferr', r'wb'),
306 306 ]
307 307
308 308 class chgcmdserver(commandserver.server):
309 309 def __init__(self, ui, repo, fin, fout, sock, hashstate, baseaddress):
310 310 super(chgcmdserver, self).__init__(
311 311 _newchgui(ui, channeledsystem(fin, fout, 'S'), self.attachio),
312 312 repo, fin, fout)
313 313 self.clientsock = sock
314 314 self._ioattached = False
315 315 self._oldios = [] # original (self.ch, ui.fp, fd) before "attachio"
316 316 self.hashstate = hashstate
317 317 self.baseaddress = baseaddress
318 318 if hashstate is not None:
319 319 self.capabilities = self.capabilities.copy()
320 320 self.capabilities['validate'] = chgcmdserver.validate
321 321
322 322 def cleanup(self):
323 323 super(chgcmdserver, self).cleanup()
324 324 # dispatch._runcatch() does not flush outputs if exception is not
325 325 # handled by dispatch._dispatch()
326 326 self.ui.flush()
327 327 self._restoreio()
328 328 self._ioattached = False
329 329
330 330 def attachio(self):
331 331 """Attach to client's stdio passed via unix domain socket; all
332 332 channels except cresult will no longer be used
333 333 """
334 334 # tell client to sendmsg() with 1-byte payload, which makes it
335 335 # distinctive from "attachio\n" command consumed by client.read()
336 336 self.clientsock.sendall(struct.pack('>cI', 'I', 1))
337 337 clientfds = util.recvfds(self.clientsock.fileno())
338 338 _log('received fds: %r\n' % clientfds)
339 339
340 340 ui = self.ui
341 341 ui.flush()
342 342 self._saveio()
343 343 for fd, (cn, fn, mode) in zip(clientfds, _iochannels):
344 344 assert fd > 0
345 345 fp = getattr(ui, fn)
346 346 os.dup2(fd, fp.fileno())
347 347 os.close(fd)
348 348 if self._ioattached:
349 349 continue
350 350 # reset buffering mode when client is first attached. as we want
351 351 # to see output immediately on pager, the mode stays unchanged
352 352 # when client re-attached. ferr is unchanged because it should
353 353 # be unbuffered no matter if it is a tty or not.
354 354 if fn == 'ferr':
355 355 newfp = fp
356 356 else:
357 357 # make it line buffered explicitly because the default is
358 358 # decided on first write(), where fout could be a pager.
359 359 if fp.isatty():
360 360 bufsize = 1 # line buffered
361 361 else:
362 362 bufsize = -1 # system default
363 363 newfp = os.fdopen(fp.fileno(), mode, bufsize)
364 364 setattr(ui, fn, newfp)
365 365 setattr(self, cn, newfp)
366 366
367 367 self._ioattached = True
368 368 self.cresult.write(struct.pack('>i', len(clientfds)))
369 369
370 370 def _saveio(self):
371 371 if self._oldios:
372 372 return
373 373 ui = self.ui
374 374 for cn, fn, _mode in _iochannels:
375 375 ch = getattr(self, cn)
376 376 fp = getattr(ui, fn)
377 377 fd = os.dup(fp.fileno())
378 378 self._oldios.append((ch, fp, fd))
379 379
380 380 def _restoreio(self):
381 381 ui = self.ui
382 382 for (ch, fp, fd), (cn, fn, _mode) in zip(self._oldios, _iochannels):
383 383 newfp = getattr(ui, fn)
384 384 # close newfp while it's associated with client; otherwise it
385 385 # would be closed when newfp is deleted
386 386 if newfp is not fp:
387 387 newfp.close()
388 388 # restore original fd: fp is open again
389 389 os.dup2(fd, fp.fileno())
390 390 os.close(fd)
391 391 setattr(self, cn, ch)
392 392 setattr(ui, fn, fp)
393 393 del self._oldios[:]
394 394
395 395 def validate(self):
396 396 """Reload the config and check if the server is up to date
397 397
398 398 Read a list of '\0' separated arguments.
399 399 Write a non-empty list of '\0' separated instruction strings or '\0'
400 400 if the list is empty.
401 401 An instruction string could be either:
402 402 - "unlink $path", the client should unlink the path to stop the
403 403 outdated server.
404 404 - "redirect $path", the client should attempt to connect to $path
405 405 first. If it does not work, start a new server. It implies
406 406 "reconnect".
407 407 - "exit $n", the client should exit directly with code n.
408 408 This may happen if we cannot parse the config.
409 409 - "reconnect", the client should close the connection and
410 410 reconnect.
411 411 If neither "reconnect" nor "redirect" is included in the instruction
412 412 list, the client can continue with this server after completing all
413 413 the instructions.
414 414 """
415 415 from . import dispatch # avoid cycle
416 416
417 417 args = self._readlist()
418 418 try:
419 419 self.ui, lui = _loadnewui(self.ui, args)
420 420 except error.ParseError as inst:
421 421 dispatch._formatparse(self.ui.warn, inst)
422 422 self.ui.flush()
423 423 self.cresult.write('exit 255')
424 424 return
425 425 newhash = hashstate.fromui(lui, self.hashstate.mtimepaths)
426 426 insts = []
427 427 if newhash.mtimehash != self.hashstate.mtimehash:
428 428 addr = _hashaddress(self.baseaddress, self.hashstate.confighash)
429 429 insts.append('unlink %s' % addr)
430 430 # mtimehash is empty if one or more extensions fail to load.
431 431 # to be compatible with hg, still serve the client this time.
432 432 if self.hashstate.mtimehash:
433 433 insts.append('reconnect')
434 434 if newhash.confighash != self.hashstate.confighash:
435 435 addr = _hashaddress(self.baseaddress, newhash.confighash)
436 436 insts.append('redirect %s' % addr)
437 437 _log('validate: %s\n' % insts)
438 438 self.cresult.write('\0'.join(insts) or '\0')
439 439
440 440 def chdir(self):
441 441 """Change current directory
442 442
443 443 Note that the behavior of --cwd option is bit different from this.
444 444 It does not affect --config parameter.
445 445 """
446 446 path = self._readstr()
447 447 if not path:
448 448 return
449 449 _log('chdir to %r\n' % path)
450 450 os.chdir(path)
451 451
452 452 def setumask(self):
453 453 """Change umask"""
454 454 mask = struct.unpack('>I', self._read(4))[0]
455 455 _log('setumask %r\n' % mask)
456 456 os.umask(mask)
457 457
458 458 def runcommand(self):
459 return super(chgcmdserver, self).runcommand()
459 # pager may be attached within the runcommand session, which should
460 # be detached at the end of the session. otherwise the pager wouldn't
461 # receive EOF.
462 globaloldios = self._oldios
463 self._oldios = []
464 try:
465 return super(chgcmdserver, self).runcommand()
466 finally:
467 self._restoreio()
468 self._oldios = globaloldios
460 469
461 470 def setenv(self):
462 471 """Clear and update os.environ
463 472
464 473 Note that not all variables can make an effect on the running process.
465 474 """
466 475 l = self._readlist()
467 476 try:
468 477 newenv = dict(s.split('=', 1) for s in l)
469 478 except ValueError:
470 479 raise ValueError('unexpected value in setenv request')
471 480 _log('setenv: %r\n' % sorted(newenv.keys()))
472 481 encoding.environ.clear()
473 482 encoding.environ.update(newenv)
474 483
475 484 capabilities = commandserver.server.capabilities.copy()
476 485 capabilities.update({'attachio': attachio,
477 486 'chdir': chdir,
478 487 'runcommand': runcommand,
479 488 'setenv': setenv,
480 489 'setumask': setumask})
481 490
482 491 if util.safehasattr(procutil, 'setprocname'):
483 492 def setprocname(self):
484 493 """Change process title"""
485 494 name = self._readstr()
486 495 _log('setprocname: %r\n' % name)
487 496 procutil.setprocname(name)
488 497 capabilities['setprocname'] = setprocname
489 498
490 499 def _tempaddress(address):
491 500 return '%s.%d.tmp' % (address, os.getpid())
492 501
493 502 def _hashaddress(address, hashstr):
494 503 # if the basename of address contains '.', use only the left part. this
495 504 # makes it possible for the client to pass 'server.tmp$PID' and follow by
496 505 # an atomic rename to avoid locking when spawning new servers.
497 506 dirname, basename = os.path.split(address)
498 507 basename = basename.split('.', 1)[0]
499 508 return '%s-%s' % (os.path.join(dirname, basename), hashstr)
500 509
501 510 class chgunixservicehandler(object):
502 511 """Set of operations for chg services"""
503 512
504 513 pollinterval = 1 # [sec]
505 514
506 515 def __init__(self, ui):
507 516 self.ui = ui
508 517 self._idletimeout = ui.configint('chgserver', 'idletimeout')
509 518 self._lastactive = time.time()
510 519
511 520 def bindsocket(self, sock, address):
512 521 self._inithashstate(address)
513 522 self._checkextensions()
514 523 self._bind(sock)
515 524 self._createsymlink()
516 525 # no "listening at" message should be printed to simulate hg behavior
517 526
518 527 def _inithashstate(self, address):
519 528 self._baseaddress = address
520 529 if self.ui.configbool('chgserver', 'skiphash'):
521 530 self._hashstate = None
522 531 self._realaddress = address
523 532 return
524 533 self._hashstate = hashstate.fromui(self.ui)
525 534 self._realaddress = _hashaddress(address, self._hashstate.confighash)
526 535
527 536 def _checkextensions(self):
528 537 if not self._hashstate:
529 538 return
530 539 if extensions.notloaded():
531 540 # one or more extensions failed to load. mtimehash becomes
532 541 # meaningless because we do not know the paths of those extensions.
533 542 # set mtimehash to an illegal hash value to invalidate the server.
534 543 self._hashstate.mtimehash = ''
535 544
536 545 def _bind(self, sock):
537 546 # use a unique temp address so we can stat the file and do ownership
538 547 # check later
539 548 tempaddress = _tempaddress(self._realaddress)
540 549 util.bindunixsocket(sock, tempaddress)
541 550 self._socketstat = os.stat(tempaddress)
542 551 sock.listen(socket.SOMAXCONN)
543 552 # rename will replace the old socket file if exists atomically. the
544 553 # old server will detect ownership change and exit.
545 554 util.rename(tempaddress, self._realaddress)
546 555
547 556 def _createsymlink(self):
548 557 if self._baseaddress == self._realaddress:
549 558 return
550 559 tempaddress = _tempaddress(self._baseaddress)
551 560 os.symlink(os.path.basename(self._realaddress), tempaddress)
552 561 util.rename(tempaddress, self._baseaddress)
553 562
554 563 def _issocketowner(self):
555 564 try:
556 565 st = os.stat(self._realaddress)
557 566 return (st.st_ino == self._socketstat.st_ino and
558 567 st[stat.ST_MTIME] == self._socketstat[stat.ST_MTIME])
559 568 except OSError:
560 569 return False
561 570
562 571 def unlinksocket(self, address):
563 572 if not self._issocketowner():
564 573 return
565 574 # it is possible to have a race condition here that we may
566 575 # remove another server's socket file. but that's okay
567 576 # since that server will detect and exit automatically and
568 577 # the client will start a new server on demand.
569 578 util.tryunlink(self._realaddress)
570 579
571 580 def shouldexit(self):
572 581 if not self._issocketowner():
573 582 self.ui.debug('%s is not owned, exiting.\n' % self._realaddress)
574 583 return True
575 584 if time.time() - self._lastactive > self._idletimeout:
576 585 self.ui.debug('being idle too long. exiting.\n')
577 586 return True
578 587 return False
579 588
580 589 def newconnection(self):
581 590 self._lastactive = time.time()
582 591
583 592 def createcmdserver(self, repo, conn, fin, fout):
584 593 return chgcmdserver(self.ui, repo, fin, fout, conn,
585 594 self._hashstate, self._baseaddress)
586 595
587 596 def chgunixservice(ui, repo, opts):
588 597 # CHGINTERNALMARK is set by chg client. It is an indication of things are
589 598 # started by chg so other code can do things accordingly, like disabling
590 599 # demandimport or detecting chg client started by chg client. When executed
591 600 # here, CHGINTERNALMARK is no longer useful and hence dropped to make
592 601 # environ cleaner.
593 602 if 'CHGINTERNALMARK' in encoding.environ:
594 603 del encoding.environ['CHGINTERNALMARK']
595 604
596 605 if repo:
597 606 # one chgserver can serve multiple repos. drop repo information
598 607 ui.setconfig('bundle', 'mainreporoot', '', 'repo')
599 608 h = chgunixservicehandler(ui)
600 609 return commandserver.unixforkingservice(ui, repo=None, opts=opts, handler=h)
General Comments 0
You need to be logged in to leave comments. Login now