##// END OF EJS Templates
chgserver: add separate flag to remember if stdio fds are replaced...
Yuya Nishihara -
r39774:a93fe297 default
parent child Browse files
Show More
@@ -1,598 +1,600 b''
1 1 # chgserver.py - command server extension for cHg
2 2 #
3 3 # Copyright 2011 Yuya Nishihara <yuya@tcha.org>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """command server extension for cHg
9 9
10 10 'S' channel (read/write)
11 11 propagate ui.system() request to client
12 12
13 13 'attachio' command
14 14 attach client's stdio passed by sendmsg()
15 15
16 16 'chdir' command
17 17 change current directory
18 18
19 19 'setenv' command
20 20 replace os.environ completely
21 21
22 22 'setumask' command
23 23 set umask
24 24
25 25 'validate' command
26 26 reload the config and check if the server is up to date
27 27
28 28 Config
29 29 ------
30 30
31 31 ::
32 32
33 33 [chgserver]
34 34 # how long (in seconds) should an idle chg server exit
35 35 idletimeout = 3600
36 36
37 37 # whether to skip config or env change checks
38 38 skiphash = False
39 39 """
40 40
41 41 from __future__ import absolute_import
42 42
43 43 import hashlib
44 44 import inspect
45 45 import os
46 46 import re
47 47 import socket
48 48 import stat
49 49 import struct
50 50 import time
51 51
52 52 from .i18n import _
53 53
54 54 from . import (
55 55 commandserver,
56 56 encoding,
57 57 error,
58 58 extensions,
59 59 node,
60 60 pycompat,
61 61 util,
62 62 )
63 63
64 64 from .utils import (
65 65 procutil,
66 66 )
67 67
68 68 _log = commandserver.log
69 69
70 70 def _hashlist(items):
71 71 """return sha1 hexdigest for a list"""
72 72 return node.hex(hashlib.sha1(str(items)).digest())
73 73
74 74 # sensitive config sections affecting confighash
75 75 _configsections = [
76 76 'alias', # affects global state commands.table
77 77 'eol', # uses setconfig('eol', ...)
78 78 'extdiff', # uisetup will register new commands
79 79 'extensions',
80 80 ]
81 81
82 82 _configsectionitems = [
83 83 ('commands', 'show.aliasprefix'), # show.py reads it in extsetup
84 84 ]
85 85
86 86 # sensitive environment variables affecting confighash
87 87 _envre = re.compile(r'''\A(?:
88 88 CHGHG
89 89 |HG(?:DEMANDIMPORT|EMITWARNINGS|MODULEPOLICY|PROF|RCPATH)?
90 90 |HG(?:ENCODING|PLAIN).*
91 91 |LANG(?:UAGE)?
92 92 |LC_.*
93 93 |LD_.*
94 94 |PATH
95 95 |PYTHON.*
96 96 |TERM(?:INFO)?
97 97 |TZ
98 98 )\Z''', re.X)
99 99
100 100 def _confighash(ui):
101 101 """return a quick hash for detecting config/env changes
102 102
103 103 confighash is the hash of sensitive config items and environment variables.
104 104
105 105 for chgserver, it is designed that once confighash changes, the server is
106 106 not qualified to serve its client and should redirect the client to a new
107 107 server. different from mtimehash, confighash change will not mark the
108 108 server outdated and exit since the user can have different configs at the
109 109 same time.
110 110 """
111 111 sectionitems = []
112 112 for section in _configsections:
113 113 sectionitems.append(ui.configitems(section))
114 114 for section, item in _configsectionitems:
115 115 sectionitems.append(ui.config(section, item))
116 116 sectionhash = _hashlist(sectionitems)
117 117 # If $CHGHG is set, the change to $HG should not trigger a new chg server
118 118 if 'CHGHG' in encoding.environ:
119 119 ignored = {'HG'}
120 120 else:
121 121 ignored = set()
122 122 envitems = [(k, v) for k, v in encoding.environ.iteritems()
123 123 if _envre.match(k) and k not in ignored]
124 124 envhash = _hashlist(sorted(envitems))
125 125 return sectionhash[:6] + envhash[:6]
126 126
127 127 def _getmtimepaths(ui):
128 128 """get a list of paths that should be checked to detect change
129 129
130 130 The list will include:
131 131 - extensions (will not cover all files for complex extensions)
132 132 - mercurial/__version__.py
133 133 - python binary
134 134 """
135 135 modules = [m for n, m in extensions.extensions(ui)]
136 136 try:
137 137 from . import __version__
138 138 modules.append(__version__)
139 139 except ImportError:
140 140 pass
141 141 files = [pycompat.sysexecutable]
142 142 for m in modules:
143 143 try:
144 144 files.append(inspect.getabsfile(m))
145 145 except TypeError:
146 146 pass
147 147 return sorted(set(files))
148 148
149 149 def _mtimehash(paths):
150 150 """return a quick hash for detecting file changes
151 151
152 152 mtimehash calls stat on given paths and calculate a hash based on size and
153 153 mtime of each file. mtimehash does not read file content because reading is
154 154 expensive. therefore it's not 100% reliable for detecting content changes.
155 155 it's possible to return different hashes for same file contents.
156 156 it's also possible to return a same hash for different file contents for
157 157 some carefully crafted situation.
158 158
159 159 for chgserver, it is designed that once mtimehash changes, the server is
160 160 considered outdated immediately and should no longer provide service.
161 161
162 162 mtimehash is not included in confighash because we only know the paths of
163 163 extensions after importing them (there is imp.find_module but that faces
164 164 race conditions). We need to calculate confighash without importing.
165 165 """
166 166 def trystat(path):
167 167 try:
168 168 st = os.stat(path)
169 169 return (st[stat.ST_MTIME], st.st_size)
170 170 except OSError:
171 171 # could be ENOENT, EPERM etc. not fatal in any case
172 172 pass
173 173 return _hashlist(map(trystat, paths))[:12]
174 174
175 175 class hashstate(object):
176 176 """a structure storing confighash, mtimehash, paths used for mtimehash"""
177 177 def __init__(self, confighash, mtimehash, mtimepaths):
178 178 self.confighash = confighash
179 179 self.mtimehash = mtimehash
180 180 self.mtimepaths = mtimepaths
181 181
182 182 @staticmethod
183 183 def fromui(ui, mtimepaths=None):
184 184 if mtimepaths is None:
185 185 mtimepaths = _getmtimepaths(ui)
186 186 confighash = _confighash(ui)
187 187 mtimehash = _mtimehash(mtimepaths)
188 188 _log('confighash = %s mtimehash = %s\n' % (confighash, mtimehash))
189 189 return hashstate(confighash, mtimehash, mtimepaths)
190 190
191 191 def _newchgui(srcui, csystem, attachio):
192 192 class chgui(srcui.__class__):
193 193 def __init__(self, src=None):
194 194 super(chgui, self).__init__(src)
195 195 if src:
196 196 self._csystem = getattr(src, '_csystem', csystem)
197 197 else:
198 198 self._csystem = csystem
199 199
200 200 def _runsystem(self, cmd, environ, cwd, out):
201 201 # fallback to the original system method if the output needs to be
202 202 # captured (to self._buffers), or the output stream is not stdout
203 203 # (e.g. stderr, cStringIO), because the chg client is not aware of
204 204 # these situations and will behave differently (write to stdout).
205 205 if (out is not self.fout
206 206 or not util.safehasattr(self.fout, 'fileno')
207 207 or self.fout.fileno() != procutil.stdout.fileno()):
208 208 return procutil.system(cmd, environ=environ, cwd=cwd, out=out)
209 209 self.flush()
210 210 return self._csystem(cmd, procutil.shellenviron(environ), cwd)
211 211
212 212 def _runpager(self, cmd, env=None):
213 213 self._csystem(cmd, procutil.shellenviron(env), type='pager',
214 214 cmdtable={'attachio': attachio})
215 215 return True
216 216
217 217 return chgui(srcui)
218 218
219 219 def _loadnewui(srcui, args):
220 220 from . import dispatch # avoid cycle
221 221
222 222 newui = srcui.__class__.load()
223 223 for a in ['fin', 'fout', 'ferr', 'environ']:
224 224 setattr(newui, a, getattr(srcui, a))
225 225 if util.safehasattr(srcui, '_csystem'):
226 226 newui._csystem = srcui._csystem
227 227
228 228 # command line args
229 229 options = dispatch._earlyparseopts(newui, args)
230 230 dispatch._parseconfig(newui, options['config'])
231 231
232 232 # stolen from tortoisehg.util.copydynamicconfig()
233 233 for section, name, value in srcui.walkconfig():
234 234 source = srcui.configsource(section, name)
235 235 if ':' in source or source == '--config' or source.startswith('$'):
236 236 # path:line or command line, or environ
237 237 continue
238 238 newui.setconfig(section, name, value, source)
239 239
240 240 # load wd and repo config, copied from dispatch.py
241 241 cwd = options['cwd']
242 242 cwd = cwd and os.path.realpath(cwd) or None
243 243 rpath = options['repository']
244 244 path, newlui = dispatch._getlocal(newui, rpath, wd=cwd)
245 245
246 246 return (newui, newlui)
247 247
248 248 class channeledsystem(object):
249 249 """Propagate ui.system() request in the following format:
250 250
251 251 payload length (unsigned int),
252 252 type, '\0',
253 253 cmd, '\0',
254 254 cwd, '\0',
255 255 envkey, '=', val, '\0',
256 256 ...
257 257 envkey, '=', val
258 258
259 259 if type == 'system', waits for:
260 260
261 261 exitcode length (unsigned int),
262 262 exitcode (int)
263 263
264 264 if type == 'pager', repetitively waits for a command name ending with '\n'
265 265 and executes it defined by cmdtable, or exits the loop if the command name
266 266 is empty.
267 267 """
268 268 def __init__(self, in_, out, channel):
269 269 self.in_ = in_
270 270 self.out = out
271 271 self.channel = channel
272 272
273 273 def __call__(self, cmd, environ, cwd=None, type='system', cmdtable=None):
274 274 args = [type, procutil.quotecommand(cmd), os.path.abspath(cwd or '.')]
275 275 args.extend('%s=%s' % (k, v) for k, v in environ.iteritems())
276 276 data = '\0'.join(args)
277 277 self.out.write(struct.pack('>cI', self.channel, len(data)))
278 278 self.out.write(data)
279 279 self.out.flush()
280 280
281 281 if type == 'system':
282 282 length = self.in_.read(4)
283 283 length, = struct.unpack('>I', length)
284 284 if length != 4:
285 285 raise error.Abort(_('invalid response'))
286 286 rc, = struct.unpack('>i', self.in_.read(4))
287 287 return rc
288 288 elif type == 'pager':
289 289 while True:
290 290 cmd = self.in_.readline()[:-1]
291 291 if not cmd:
292 292 break
293 293 if cmdtable and cmd in cmdtable:
294 294 _log('pager subcommand: %s' % cmd)
295 295 cmdtable[cmd]()
296 296 else:
297 297 raise error.Abort(_('unexpected command: %s') % cmd)
298 298 else:
299 299 raise error.ProgrammingError('invalid S channel type: %s' % type)
300 300
301 301 _iochannels = [
302 302 # server.ch, ui.fp, mode
303 303 ('cin', 'fin', r'rb'),
304 304 ('cout', 'fout', r'wb'),
305 305 ('cerr', 'ferr', r'wb'),
306 306 ]
307 307
308 308 class chgcmdserver(commandserver.server):
309 309 def __init__(self, ui, repo, fin, fout, sock, hashstate, baseaddress):
310 310 super(chgcmdserver, self).__init__(
311 311 _newchgui(ui, channeledsystem(fin, fout, 'S'), self.attachio),
312 312 repo, fin, fout)
313 313 self.clientsock = sock
314 self._ioattached = False
314 315 self._oldios = [] # original (self.ch, ui.fp, fd) before "attachio"
315 316 self.hashstate = hashstate
316 317 self.baseaddress = baseaddress
317 318 if hashstate is not None:
318 319 self.capabilities = self.capabilities.copy()
319 320 self.capabilities['validate'] = chgcmdserver.validate
320 321
321 322 def cleanup(self):
322 323 super(chgcmdserver, self).cleanup()
323 324 # dispatch._runcatch() does not flush outputs if exception is not
324 325 # handled by dispatch._dispatch()
325 326 self.ui.flush()
326 327 self._restoreio()
328 self._ioattached = False
327 329
328 330 def attachio(self):
329 331 """Attach to client's stdio passed via unix domain socket; all
330 332 channels except cresult will no longer be used
331 333 """
332 334 # tell client to sendmsg() with 1-byte payload, which makes it
333 335 # distinctive from "attachio\n" command consumed by client.read()
334 336 self.clientsock.sendall(struct.pack('>cI', 'I', 1))
335 337 clientfds = util.recvfds(self.clientsock.fileno())
336 338 _log('received fds: %r\n' % clientfds)
337 339
338 340 ui = self.ui
339 341 ui.flush()
340 first = self._saveio()
342 self._saveio()
341 343 for fd, (cn, fn, mode) in zip(clientfds, _iochannels):
342 344 assert fd > 0
343 345 fp = getattr(ui, fn)
344 346 os.dup2(fd, fp.fileno())
345 347 os.close(fd)
346 if not first:
348 if self._ioattached:
347 349 continue
348 350 # reset buffering mode when client is first attached. as we want
349 351 # to see output immediately on pager, the mode stays unchanged
350 352 # when client re-attached. ferr is unchanged because it should
351 353 # be unbuffered no matter if it is a tty or not.
352 354 if fn == 'ferr':
353 355 newfp = fp
354 356 else:
355 357 # make it line buffered explicitly because the default is
356 358 # decided on first write(), where fout could be a pager.
357 359 if fp.isatty():
358 360 bufsize = 1 # line buffered
359 361 else:
360 362 bufsize = -1 # system default
361 363 newfp = os.fdopen(fp.fileno(), mode, bufsize)
362 364 setattr(ui, fn, newfp)
363 365 setattr(self, cn, newfp)
364 366
367 self._ioattached = True
365 368 self.cresult.write(struct.pack('>i', len(clientfds)))
366 369
367 370 def _saveio(self):
368 371 if self._oldios:
369 return False
372 return
370 373 ui = self.ui
371 374 for cn, fn, _mode in _iochannels:
372 375 ch = getattr(self, cn)
373 376 fp = getattr(ui, fn)
374 377 fd = os.dup(fp.fileno())
375 378 self._oldios.append((ch, fp, fd))
376 return True
377 379
378 380 def _restoreio(self):
379 381 ui = self.ui
380 382 for (ch, fp, fd), (cn, fn, _mode) in zip(self._oldios, _iochannels):
381 383 newfp = getattr(ui, fn)
382 384 # close newfp while it's associated with client; otherwise it
383 385 # would be closed when newfp is deleted
384 386 if newfp is not fp:
385 387 newfp.close()
386 388 # restore original fd: fp is open again
387 389 os.dup2(fd, fp.fileno())
388 390 os.close(fd)
389 391 setattr(self, cn, ch)
390 392 setattr(ui, fn, fp)
391 393 del self._oldios[:]
392 394
393 395 def validate(self):
394 396 """Reload the config and check if the server is up to date
395 397
396 398 Read a list of '\0' separated arguments.
397 399 Write a non-empty list of '\0' separated instruction strings or '\0'
398 400 if the list is empty.
399 401 An instruction string could be either:
400 402 - "unlink $path", the client should unlink the path to stop the
401 403 outdated server.
402 404 - "redirect $path", the client should attempt to connect to $path
403 405 first. If it does not work, start a new server. It implies
404 406 "reconnect".
405 407 - "exit $n", the client should exit directly with code n.
406 408 This may happen if we cannot parse the config.
407 409 - "reconnect", the client should close the connection and
408 410 reconnect.
409 411 If neither "reconnect" nor "redirect" is included in the instruction
410 412 list, the client can continue with this server after completing all
411 413 the instructions.
412 414 """
413 415 from . import dispatch # avoid cycle
414 416
415 417 args = self._readlist()
416 418 try:
417 419 self.ui, lui = _loadnewui(self.ui, args)
418 420 except error.ParseError as inst:
419 421 dispatch._formatparse(self.ui.warn, inst)
420 422 self.ui.flush()
421 423 self.cresult.write('exit 255')
422 424 return
423 425 newhash = hashstate.fromui(lui, self.hashstate.mtimepaths)
424 426 insts = []
425 427 if newhash.mtimehash != self.hashstate.mtimehash:
426 428 addr = _hashaddress(self.baseaddress, self.hashstate.confighash)
427 429 insts.append('unlink %s' % addr)
428 430 # mtimehash is empty if one or more extensions fail to load.
429 431 # to be compatible with hg, still serve the client this time.
430 432 if self.hashstate.mtimehash:
431 433 insts.append('reconnect')
432 434 if newhash.confighash != self.hashstate.confighash:
433 435 addr = _hashaddress(self.baseaddress, newhash.confighash)
434 436 insts.append('redirect %s' % addr)
435 437 _log('validate: %s\n' % insts)
436 438 self.cresult.write('\0'.join(insts) or '\0')
437 439
438 440 def chdir(self):
439 441 """Change current directory
440 442
441 443 Note that the behavior of --cwd option is bit different from this.
442 444 It does not affect --config parameter.
443 445 """
444 446 path = self._readstr()
445 447 if not path:
446 448 return
447 449 _log('chdir to %r\n' % path)
448 450 os.chdir(path)
449 451
450 452 def setumask(self):
451 453 """Change umask"""
452 454 mask = struct.unpack('>I', self._read(4))[0]
453 455 _log('setumask %r\n' % mask)
454 456 os.umask(mask)
455 457
456 458 def runcommand(self):
457 459 return super(chgcmdserver, self).runcommand()
458 460
459 461 def setenv(self):
460 462 """Clear and update os.environ
461 463
462 464 Note that not all variables can make an effect on the running process.
463 465 """
464 466 l = self._readlist()
465 467 try:
466 468 newenv = dict(s.split('=', 1) for s in l)
467 469 except ValueError:
468 470 raise ValueError('unexpected value in setenv request')
469 471 _log('setenv: %r\n' % sorted(newenv.keys()))
470 472 encoding.environ.clear()
471 473 encoding.environ.update(newenv)
472 474
473 475 capabilities = commandserver.server.capabilities.copy()
474 476 capabilities.update({'attachio': attachio,
475 477 'chdir': chdir,
476 478 'runcommand': runcommand,
477 479 'setenv': setenv,
478 480 'setumask': setumask})
479 481
480 482 if util.safehasattr(procutil, 'setprocname'):
481 483 def setprocname(self):
482 484 """Change process title"""
483 485 name = self._readstr()
484 486 _log('setprocname: %r\n' % name)
485 487 procutil.setprocname(name)
486 488 capabilities['setprocname'] = setprocname
487 489
488 490 def _tempaddress(address):
489 491 return '%s.%d.tmp' % (address, os.getpid())
490 492
491 493 def _hashaddress(address, hashstr):
492 494 # if the basename of address contains '.', use only the left part. this
493 495 # makes it possible for the client to pass 'server.tmp$PID' and follow by
494 496 # an atomic rename to avoid locking when spawning new servers.
495 497 dirname, basename = os.path.split(address)
496 498 basename = basename.split('.', 1)[0]
497 499 return '%s-%s' % (os.path.join(dirname, basename), hashstr)
498 500
499 501 class chgunixservicehandler(object):
500 502 """Set of operations for chg services"""
501 503
502 504 pollinterval = 1 # [sec]
503 505
504 506 def __init__(self, ui):
505 507 self.ui = ui
506 508 self._idletimeout = ui.configint('chgserver', 'idletimeout')
507 509 self._lastactive = time.time()
508 510
509 511 def bindsocket(self, sock, address):
510 512 self._inithashstate(address)
511 513 self._checkextensions()
512 514 self._bind(sock)
513 515 self._createsymlink()
514 516 # no "listening at" message should be printed to simulate hg behavior
515 517
516 518 def _inithashstate(self, address):
517 519 self._baseaddress = address
518 520 if self.ui.configbool('chgserver', 'skiphash'):
519 521 self._hashstate = None
520 522 self._realaddress = address
521 523 return
522 524 self._hashstate = hashstate.fromui(self.ui)
523 525 self._realaddress = _hashaddress(address, self._hashstate.confighash)
524 526
525 527 def _checkextensions(self):
526 528 if not self._hashstate:
527 529 return
528 530 if extensions.notloaded():
529 531 # one or more extensions failed to load. mtimehash becomes
530 532 # meaningless because we do not know the paths of those extensions.
531 533 # set mtimehash to an illegal hash value to invalidate the server.
532 534 self._hashstate.mtimehash = ''
533 535
534 536 def _bind(self, sock):
535 537 # use a unique temp address so we can stat the file and do ownership
536 538 # check later
537 539 tempaddress = _tempaddress(self._realaddress)
538 540 util.bindunixsocket(sock, tempaddress)
539 541 self._socketstat = os.stat(tempaddress)
540 542 sock.listen(socket.SOMAXCONN)
541 543 # rename will replace the old socket file if exists atomically. the
542 544 # old server will detect ownership change and exit.
543 545 util.rename(tempaddress, self._realaddress)
544 546
545 547 def _createsymlink(self):
546 548 if self._baseaddress == self._realaddress:
547 549 return
548 550 tempaddress = _tempaddress(self._baseaddress)
549 551 os.symlink(os.path.basename(self._realaddress), tempaddress)
550 552 util.rename(tempaddress, self._baseaddress)
551 553
552 554 def _issocketowner(self):
553 555 try:
554 556 st = os.stat(self._realaddress)
555 557 return (st.st_ino == self._socketstat.st_ino and
556 558 st[stat.ST_MTIME] == self._socketstat[stat.ST_MTIME])
557 559 except OSError:
558 560 return False
559 561
560 562 def unlinksocket(self, address):
561 563 if not self._issocketowner():
562 564 return
563 565 # it is possible to have a race condition here that we may
564 566 # remove another server's socket file. but that's okay
565 567 # since that server will detect and exit automatically and
566 568 # the client will start a new server on demand.
567 569 util.tryunlink(self._realaddress)
568 570
569 571 def shouldexit(self):
570 572 if not self._issocketowner():
571 573 self.ui.debug('%s is not owned, exiting.\n' % self._realaddress)
572 574 return True
573 575 if time.time() - self._lastactive > self._idletimeout:
574 576 self.ui.debug('being idle too long. exiting.\n')
575 577 return True
576 578 return False
577 579
578 580 def newconnection(self):
579 581 self._lastactive = time.time()
580 582
581 583 def createcmdserver(self, repo, conn, fin, fout):
582 584 return chgcmdserver(self.ui, repo, fin, fout, conn,
583 585 self._hashstate, self._baseaddress)
584 586
585 587 def chgunixservice(ui, repo, opts):
586 588 # CHGINTERNALMARK is set by chg client. It is an indication of things are
587 589 # started by chg so other code can do things accordingly, like disabling
588 590 # demandimport or detecting chg client started by chg client. When executed
589 591 # here, CHGINTERNALMARK is no longer useful and hence dropped to make
590 592 # environ cleaner.
591 593 if 'CHGINTERNALMARK' in encoding.environ:
592 594 del encoding.environ['CHGINTERNALMARK']
593 595
594 596 if repo:
595 597 # one chgserver can serve multiple repos. drop repo information
596 598 ui.setconfig('bundle', 'mainreporoot', '', 'repo')
597 599 h = chgunixservicehandler(ui)
598 600 return commandserver.unixforkingservice(ui, repo=None, opts=opts, handler=h)
General Comments 0
You need to be logged in to leave comments. Login now