##// END OF EJS Templates
commandserver: extract function that serves for the current connection...
Yuya Nishihara -
r29543:d74b8a4f default
parent child Browse files
Show More
@@ -1,663 +1,662
1 1 # chgserver.py - command server extension for cHg
2 2 #
3 3 # Copyright 2011 Yuya Nishihara <yuya@tcha.org>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """command server extension for cHg (EXPERIMENTAL)
9 9
10 10 'S' channel (read/write)
11 11 propagate ui.system() request to client
12 12
13 13 'attachio' command
14 14 attach client's stdio passed by sendmsg()
15 15
16 16 'chdir' command
17 17 change current directory
18 18
19 19 'getpager' command
20 20 checks if pager is enabled and which pager should be executed
21 21
22 22 'setenv' command
23 23 replace os.environ completely
24 24
25 25 'setumask' command
26 26 set umask
27 27
28 28 'validate' command
29 29 reload the config and check if the server is up to date
30 30
31 31 Config
32 32 ------
33 33
34 34 ::
35 35
36 36 [chgserver]
37 37 idletimeout = 3600 # seconds, after which an idle server will exit
38 38 skiphash = False # whether to skip config or env change checks
39 39 """
40 40
41 41 from __future__ import absolute_import
42 42
43 43 import errno
44 44 import hashlib
45 45 import inspect
46 46 import os
47 47 import re
48 48 import signal
49 49 import struct
50 50 import sys
51 51 import threading
52 52 import time
53 53
54 54 from mercurial.i18n import _
55 55
56 56 from mercurial import (
57 57 cmdutil,
58 58 commands,
59 59 commandserver,
60 60 dispatch,
61 61 error,
62 62 extensions,
63 63 osutil,
64 64 util,
65 65 )
66 66
67 67 socketserver = util.socketserver
68 68
69 69 # Note for extension authors: ONLY specify testedwith = 'internal' for
70 70 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
71 71 # be specifying the version(s) of Mercurial they are tested with, or
72 72 # leave the attribute unspecified.
73 73 testedwith = 'internal'
74 74
75 75 _log = commandserver.log
76 76
77 77 def _hashlist(items):
78 78 """return sha1 hexdigest for a list"""
79 79 return hashlib.sha1(str(items)).hexdigest()
80 80
81 81 # sensitive config sections affecting confighash
82 82 _configsections = [
83 83 'alias', # affects global state commands.table
84 84 'extdiff', # uisetup will register new commands
85 85 'extensions',
86 86 ]
87 87
88 88 # sensitive environment variables affecting confighash
89 89 _envre = re.compile(r'''\A(?:
90 90 CHGHG
91 91 |HG.*
92 92 |LANG(?:UAGE)?
93 93 |LC_.*
94 94 |LD_.*
95 95 |PATH
96 96 |PYTHON.*
97 97 |TERM(?:INFO)?
98 98 |TZ
99 99 )\Z''', re.X)
100 100
101 101 def _confighash(ui):
102 102 """return a quick hash for detecting config/env changes
103 103
104 104 confighash is the hash of sensitive config items and environment variables.
105 105
106 106 for chgserver, it is designed that once confighash changes, the server is
107 107 not qualified to serve its client and should redirect the client to a new
108 108 server. different from mtimehash, confighash change will not mark the
109 109 server outdated and exit since the user can have different configs at the
110 110 same time.
111 111 """
112 112 sectionitems = []
113 113 for section in _configsections:
114 114 sectionitems.append(ui.configitems(section))
115 115 sectionhash = _hashlist(sectionitems)
116 116 envitems = [(k, v) for k, v in os.environ.iteritems() if _envre.match(k)]
117 117 envhash = _hashlist(sorted(envitems))
118 118 return sectionhash[:6] + envhash[:6]
119 119
120 120 def _getmtimepaths(ui):
121 121 """get a list of paths that should be checked to detect change
122 122
123 123 The list will include:
124 124 - extensions (will not cover all files for complex extensions)
125 125 - mercurial/__version__.py
126 126 - python binary
127 127 """
128 128 modules = [m for n, m in extensions.extensions(ui)]
129 129 try:
130 130 from mercurial import __version__
131 131 modules.append(__version__)
132 132 except ImportError:
133 133 pass
134 134 files = [sys.executable]
135 135 for m in modules:
136 136 try:
137 137 files.append(inspect.getabsfile(m))
138 138 except TypeError:
139 139 pass
140 140 return sorted(set(files))
141 141
142 142 def _mtimehash(paths):
143 143 """return a quick hash for detecting file changes
144 144
145 145 mtimehash calls stat on given paths and calculate a hash based on size and
146 146 mtime of each file. mtimehash does not read file content because reading is
147 147 expensive. therefore it's not 100% reliable for detecting content changes.
148 148 it's possible to return different hashes for same file contents.
149 149 it's also possible to return a same hash for different file contents for
150 150 some carefully crafted situation.
151 151
152 152 for chgserver, it is designed that once mtimehash changes, the server is
153 153 considered outdated immediately and should no longer provide service.
154 154
155 155 mtimehash is not included in confighash because we only know the paths of
156 156 extensions after importing them (there is imp.find_module but that faces
157 157 race conditions). We need to calculate confighash without importing.
158 158 """
159 159 def trystat(path):
160 160 try:
161 161 st = os.stat(path)
162 162 return (st.st_mtime, st.st_size)
163 163 except OSError:
164 164 # could be ENOENT, EPERM etc. not fatal in any case
165 165 pass
166 166 return _hashlist(map(trystat, paths))[:12]
167 167
168 168 class hashstate(object):
169 169 """a structure storing confighash, mtimehash, paths used for mtimehash"""
170 170 def __init__(self, confighash, mtimehash, mtimepaths):
171 171 self.confighash = confighash
172 172 self.mtimehash = mtimehash
173 173 self.mtimepaths = mtimepaths
174 174
175 175 @staticmethod
176 176 def fromui(ui, mtimepaths=None):
177 177 if mtimepaths is None:
178 178 mtimepaths = _getmtimepaths(ui)
179 179 confighash = _confighash(ui)
180 180 mtimehash = _mtimehash(mtimepaths)
181 181 _log('confighash = %s mtimehash = %s\n' % (confighash, mtimehash))
182 182 return hashstate(confighash, mtimehash, mtimepaths)
183 183
184 184 # copied from hgext/pager.py:uisetup()
185 185 def _setuppagercmd(ui, options, cmd):
186 186 if not ui.formatted():
187 187 return
188 188
189 189 p = ui.config("pager", "pager", os.environ.get("PAGER"))
190 190 usepager = False
191 191 always = util.parsebool(options['pager'])
192 192 auto = options['pager'] == 'auto'
193 193
194 194 if not p:
195 195 pass
196 196 elif always:
197 197 usepager = True
198 198 elif not auto:
199 199 usepager = False
200 200 else:
201 201 attended = ['annotate', 'cat', 'diff', 'export', 'glog', 'log', 'qdiff']
202 202 attend = ui.configlist('pager', 'attend', attended)
203 203 ignore = ui.configlist('pager', 'ignore')
204 204 cmds, _ = cmdutil.findcmd(cmd, commands.table)
205 205
206 206 for cmd in cmds:
207 207 var = 'attend-%s' % cmd
208 208 if ui.config('pager', var):
209 209 usepager = ui.configbool('pager', var)
210 210 break
211 211 if (cmd in attend or
212 212 (cmd not in ignore and not attend)):
213 213 usepager = True
214 214 break
215 215
216 216 if usepager:
217 217 ui.setconfig('ui', 'formatted', ui.formatted(), 'pager')
218 218 ui.setconfig('ui', 'interactive', False, 'pager')
219 219 return p
220 220
221 221 def _newchgui(srcui, csystem):
222 222 class chgui(srcui.__class__):
223 223 def __init__(self, src=None):
224 224 super(chgui, self).__init__(src)
225 225 if src:
226 226 self._csystem = getattr(src, '_csystem', csystem)
227 227 else:
228 228 self._csystem = csystem
229 229
230 230 def system(self, cmd, environ=None, cwd=None, onerr=None,
231 231 errprefix=None):
232 232 # fallback to the original system method if the output needs to be
233 233 # captured (to self._buffers), or the output stream is not stdout
234 234 # (e.g. stderr, cStringIO), because the chg client is not aware of
235 235 # these situations and will behave differently (write to stdout).
236 236 if (any(s[1] for s in self._bufferstates)
237 237 or not util.safehasattr(self.fout, 'fileno')
238 238 or self.fout.fileno() != sys.stdout.fileno()):
239 239 return super(chgui, self).system(cmd, environ, cwd, onerr,
240 240 errprefix)
241 241 # copied from mercurial/util.py:system()
242 242 self.flush()
243 243 def py2shell(val):
244 244 if val is None or val is False:
245 245 return '0'
246 246 if val is True:
247 247 return '1'
248 248 return str(val)
249 249 env = os.environ.copy()
250 250 if environ:
251 251 env.update((k, py2shell(v)) for k, v in environ.iteritems())
252 252 env['HG'] = util.hgexecutable()
253 253 rc = self._csystem(cmd, env, cwd)
254 254 if rc and onerr:
255 255 errmsg = '%s %s' % (os.path.basename(cmd.split(None, 1)[0]),
256 256 util.explainexit(rc)[0])
257 257 if errprefix:
258 258 errmsg = '%s: %s' % (errprefix, errmsg)
259 259 raise onerr(errmsg)
260 260 return rc
261 261
262 262 return chgui(srcui)
263 263
264 264 def _loadnewui(srcui, args):
265 265 newui = srcui.__class__()
266 266 for a in ['fin', 'fout', 'ferr', 'environ']:
267 267 setattr(newui, a, getattr(srcui, a))
268 268 if util.safehasattr(srcui, '_csystem'):
269 269 newui._csystem = srcui._csystem
270 270
271 271 # internal config: extensions.chgserver
272 272 newui.setconfig('extensions', 'chgserver',
273 273 srcui.config('extensions', 'chgserver'), '--config')
274 274
275 275 # command line args
276 276 args = args[:]
277 277 dispatch._parseconfig(newui, dispatch._earlygetopt(['--config'], args))
278 278
279 279 # stolen from tortoisehg.util.copydynamicconfig()
280 280 for section, name, value in srcui.walkconfig():
281 281 source = srcui.configsource(section, name)
282 282 if ':' in source or source == '--config':
283 283 # path:line or command line
284 284 continue
285 285 if source == 'none':
286 286 # ui.configsource returns 'none' by default
287 287 source = ''
288 288 newui.setconfig(section, name, value, source)
289 289
290 290 # load wd and repo config, copied from dispatch.py
291 291 cwds = dispatch._earlygetopt(['--cwd'], args)
292 292 cwd = cwds and os.path.realpath(cwds[-1]) or None
293 293 rpath = dispatch._earlygetopt(["-R", "--repository", "--repo"], args)
294 294 path, newlui = dispatch._getlocal(newui, rpath, wd=cwd)
295 295
296 296 return (newui, newlui)
297 297
298 298 class channeledsystem(object):
299 299 """Propagate ui.system() request in the following format:
300 300
301 301 payload length (unsigned int),
302 302 cmd, '\0',
303 303 cwd, '\0',
304 304 envkey, '=', val, '\0',
305 305 ...
306 306 envkey, '=', val
307 307
308 308 and waits:
309 309
310 310 exitcode length (unsigned int),
311 311 exitcode (int)
312 312 """
313 313 def __init__(self, in_, out, channel):
314 314 self.in_ = in_
315 315 self.out = out
316 316 self.channel = channel
317 317
318 318 def __call__(self, cmd, environ, cwd):
319 319 args = [util.quotecommand(cmd), os.path.abspath(cwd or '.')]
320 320 args.extend('%s=%s' % (k, v) for k, v in environ.iteritems())
321 321 data = '\0'.join(args)
322 322 self.out.write(struct.pack('>cI', self.channel, len(data)))
323 323 self.out.write(data)
324 324 self.out.flush()
325 325
326 326 length = self.in_.read(4)
327 327 length, = struct.unpack('>I', length)
328 328 if length != 4:
329 329 raise error.Abort(_('invalid response'))
330 330 rc, = struct.unpack('>i', self.in_.read(4))
331 331 return rc
332 332
333 333 _iochannels = [
334 334 # server.ch, ui.fp, mode
335 335 ('cin', 'fin', 'rb'),
336 336 ('cout', 'fout', 'wb'),
337 337 ('cerr', 'ferr', 'wb'),
338 338 ]
339 339
340 340 class chgcmdserver(commandserver.server):
341 341 def __init__(self, ui, repo, fin, fout, sock, hashstate, baseaddress):
342 342 super(chgcmdserver, self).__init__(
343 343 _newchgui(ui, channeledsystem(fin, fout, 'S')), repo, fin, fout)
344 344 self.clientsock = sock
345 345 self._oldios = [] # original (self.ch, ui.fp, fd) before "attachio"
346 346 self.hashstate = hashstate
347 347 self.baseaddress = baseaddress
348 348 if hashstate is not None:
349 349 self.capabilities = self.capabilities.copy()
350 350 self.capabilities['validate'] = chgcmdserver.validate
351 351
352 352 def cleanup(self):
353 353 super(chgcmdserver, self).cleanup()
354 354 # dispatch._runcatch() does not flush outputs if exception is not
355 355 # handled by dispatch._dispatch()
356 356 self.ui.flush()
357 357 self._restoreio()
358 358
359 359 def attachio(self):
360 360 """Attach to client's stdio passed via unix domain socket; all
361 361 channels except cresult will no longer be used
362 362 """
363 363 # tell client to sendmsg() with 1-byte payload, which makes it
364 364 # distinctive from "attachio\n" command consumed by client.read()
365 365 self.clientsock.sendall(struct.pack('>cI', 'I', 1))
366 366 clientfds = osutil.recvfds(self.clientsock.fileno())
367 367 _log('received fds: %r\n' % clientfds)
368 368
369 369 ui = self.ui
370 370 ui.flush()
371 371 first = self._saveio()
372 372 for fd, (cn, fn, mode) in zip(clientfds, _iochannels):
373 373 assert fd > 0
374 374 fp = getattr(ui, fn)
375 375 os.dup2(fd, fp.fileno())
376 376 os.close(fd)
377 377 if not first:
378 378 continue
379 379 # reset buffering mode when client is first attached. as we want
380 380 # to see output immediately on pager, the mode stays unchanged
381 381 # when client re-attached. ferr is unchanged because it should
382 382 # be unbuffered no matter if it is a tty or not.
383 383 if fn == 'ferr':
384 384 newfp = fp
385 385 else:
386 386 # make it line buffered explicitly because the default is
387 387 # decided on first write(), where fout could be a pager.
388 388 if fp.isatty():
389 389 bufsize = 1 # line buffered
390 390 else:
391 391 bufsize = -1 # system default
392 392 newfp = os.fdopen(fp.fileno(), mode, bufsize)
393 393 setattr(ui, fn, newfp)
394 394 setattr(self, cn, newfp)
395 395
396 396 self.cresult.write(struct.pack('>i', len(clientfds)))
397 397
398 398 def _saveio(self):
399 399 if self._oldios:
400 400 return False
401 401 ui = self.ui
402 402 for cn, fn, _mode in _iochannels:
403 403 ch = getattr(self, cn)
404 404 fp = getattr(ui, fn)
405 405 fd = os.dup(fp.fileno())
406 406 self._oldios.append((ch, fp, fd))
407 407 return True
408 408
409 409 def _restoreio(self):
410 410 ui = self.ui
411 411 for (ch, fp, fd), (cn, fn, _mode) in zip(self._oldios, _iochannels):
412 412 newfp = getattr(ui, fn)
413 413 # close newfp while it's associated with client; otherwise it
414 414 # would be closed when newfp is deleted
415 415 if newfp is not fp:
416 416 newfp.close()
417 417 # restore original fd: fp is open again
418 418 os.dup2(fd, fp.fileno())
419 419 os.close(fd)
420 420 setattr(self, cn, ch)
421 421 setattr(ui, fn, fp)
422 422 del self._oldios[:]
423 423
424 424 def validate(self):
425 425 """Reload the config and check if the server is up to date
426 426
427 427 Read a list of '\0' separated arguments.
428 428 Write a non-empty list of '\0' separated instruction strings or '\0'
429 429 if the list is empty.
430 430 An instruction string could be either:
431 431 - "unlink $path", the client should unlink the path to stop the
432 432 outdated server.
433 433 - "redirect $path", the client should attempt to connect to $path
434 434 first. If it does not work, start a new server. It implies
435 435 "reconnect".
436 436 - "exit $n", the client should exit directly with code n.
437 437 This may happen if we cannot parse the config.
438 438 - "reconnect", the client should close the connection and
439 439 reconnect.
440 440 If neither "reconnect" nor "redirect" is included in the instruction
441 441 list, the client can continue with this server after completing all
442 442 the instructions.
443 443 """
444 444 args = self._readlist()
445 445 try:
446 446 self.ui, lui = _loadnewui(self.ui, args)
447 447 except error.ParseError as inst:
448 448 dispatch._formatparse(self.ui.warn, inst)
449 449 self.ui.flush()
450 450 self.cresult.write('exit 255')
451 451 return
452 452 newhash = hashstate.fromui(lui, self.hashstate.mtimepaths)
453 453 insts = []
454 454 if newhash.mtimehash != self.hashstate.mtimehash:
455 455 addr = _hashaddress(self.baseaddress, self.hashstate.confighash)
456 456 insts.append('unlink %s' % addr)
457 457 # mtimehash is empty if one or more extensions fail to load.
458 458 # to be compatible with hg, still serve the client this time.
459 459 if self.hashstate.mtimehash:
460 460 insts.append('reconnect')
461 461 if newhash.confighash != self.hashstate.confighash:
462 462 addr = _hashaddress(self.baseaddress, newhash.confighash)
463 463 insts.append('redirect %s' % addr)
464 464 _log('validate: %s\n' % insts)
465 465 self.cresult.write('\0'.join(insts) or '\0')
466 466
467 467 def chdir(self):
468 468 """Change current directory
469 469
470 470 Note that the behavior of --cwd option is bit different from this.
471 471 It does not affect --config parameter.
472 472 """
473 473 path = self._readstr()
474 474 if not path:
475 475 return
476 476 _log('chdir to %r\n' % path)
477 477 os.chdir(path)
478 478
479 479 def setumask(self):
480 480 """Change umask"""
481 481 mask = struct.unpack('>I', self._read(4))[0]
482 482 _log('setumask %r\n' % mask)
483 483 os.umask(mask)
484 484
485 485 def getpager(self):
486 486 """Read cmdargs and write pager command to r-channel if enabled
487 487
488 488 If pager isn't enabled, this writes '\0' because channeledoutput
489 489 does not allow to write empty data.
490 490 """
491 491 args = self._readlist()
492 492 try:
493 493 cmd, _func, args, options, _cmdoptions = dispatch._parse(self.ui,
494 494 args)
495 495 except (error.Abort, error.AmbiguousCommand, error.CommandError,
496 496 error.UnknownCommand):
497 497 cmd = None
498 498 options = {}
499 499 if not cmd or 'pager' not in options:
500 500 self.cresult.write('\0')
501 501 return
502 502
503 503 pagercmd = _setuppagercmd(self.ui, options, cmd)
504 504 if pagercmd:
505 505 # Python's SIGPIPE is SIG_IGN by default. change to SIG_DFL so
506 506 # we can exit if the pipe to the pager is closed
507 507 if util.safehasattr(signal, 'SIGPIPE') and \
508 508 signal.getsignal(signal.SIGPIPE) == signal.SIG_IGN:
509 509 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
510 510 self.cresult.write(pagercmd)
511 511 else:
512 512 self.cresult.write('\0')
513 513
514 514 def setenv(self):
515 515 """Clear and update os.environ
516 516
517 517 Note that not all variables can make an effect on the running process.
518 518 """
519 519 l = self._readlist()
520 520 try:
521 521 newenv = dict(s.split('=', 1) for s in l)
522 522 except ValueError:
523 523 raise ValueError('unexpected value in setenv request')
524 524 _log('setenv: %r\n' % sorted(newenv.keys()))
525 525 os.environ.clear()
526 526 os.environ.update(newenv)
527 527
528 528 capabilities = commandserver.server.capabilities.copy()
529 529 capabilities.update({'attachio': attachio,
530 530 'chdir': chdir,
531 531 'getpager': getpager,
532 532 'setenv': setenv,
533 533 'setumask': setumask})
534 534
535 535 class _requesthandler(commandserver._requesthandler):
536 def _createcmdserver(self, conn, fin, fout):
536 def _createcmdserver(self, repo, conn, fin, fout):
537 537 ui = self.server.ui
538 repo = self.server.repo
539 538 return chgcmdserver(ui, repo, fin, fout, conn,
540 539 self.server.hashstate, self.server.baseaddress)
541 540
542 541 def _tempaddress(address):
543 542 return '%s.%d.tmp' % (address, os.getpid())
544 543
545 544 def _hashaddress(address, hashstr):
546 545 return '%s-%s' % (address, hashstr)
547 546
548 547 class AutoExitMixIn: # use old-style to comply with SocketServer design
549 548 lastactive = time.time()
550 549 idletimeout = 3600 # default 1 hour
551 550
552 551 def startautoexitthread(self):
553 552 # note: the auto-exit check here is cheap enough to not use a thread,
554 553 # be done in serve_forever. however SocketServer is hook-unfriendly,
555 554 # you simply cannot hook serve_forever without copying a lot of code.
556 555 # besides, serve_forever's docstring suggests using thread.
557 556 thread = threading.Thread(target=self._autoexitloop)
558 557 thread.daemon = True
559 558 thread.start()
560 559
561 560 def _autoexitloop(self, interval=1):
562 561 while True:
563 562 time.sleep(interval)
564 563 if not self.issocketowner():
565 564 _log('%s is not owned, exiting.\n' % self.server_address)
566 565 break
567 566 if time.time() - self.lastactive > self.idletimeout:
568 567 _log('being idle too long. exiting.\n')
569 568 break
570 569 self.shutdown()
571 570
572 571 def process_request(self, request, address):
573 572 self.lastactive = time.time()
574 573 return socketserver.ForkingMixIn.process_request(
575 574 self, request, address)
576 575
577 576 def server_bind(self):
578 577 # use a unique temp address so we can stat the file and do ownership
579 578 # check later
580 579 tempaddress = _tempaddress(self.server_address)
581 580 util.bindunixsocket(self.socket, tempaddress)
582 581 self._socketstat = os.stat(tempaddress)
583 582 # rename will replace the old socket file if exists atomically. the
584 583 # old server will detect ownership change and exit.
585 584 util.rename(tempaddress, self.server_address)
586 585
587 586 def issocketowner(self):
588 587 try:
589 588 stat = os.stat(self.server_address)
590 589 return (stat.st_ino == self._socketstat.st_ino and
591 590 stat.st_mtime == self._socketstat.st_mtime)
592 591 except OSError:
593 592 return False
594 593
595 594 def unlinksocketfile(self):
596 595 if not self.issocketowner():
597 596 return
598 597 # it is possible to have a race condition here that we may
599 598 # remove another server's socket file. but that's okay
600 599 # since that server will detect and exit automatically and
601 600 # the client will start a new server on demand.
602 601 try:
603 602 os.unlink(self.server_address)
604 603 except OSError as exc:
605 604 if exc.errno != errno.ENOENT:
606 605 raise
607 606
608 607 class chgunixservice(commandserver.unixservice):
609 608 def __init__(self, ui, repo, opts):
610 609 super(chgunixservice, self).__init__(ui, repo=None, opts=opts)
611 610 if repo:
612 611 # one chgserver can serve multiple repos. drop repo infomation
613 612 self.ui.setconfig('bundle', 'mainreporoot', '', 'repo')
614 613
615 614 def init(self):
616 615 self._inithashstate()
617 616 self._checkextensions()
618 617 class cls(AutoExitMixIn, socketserver.ForkingMixIn,
619 618 socketserver.UnixStreamServer):
620 619 ui = self.ui
621 620 repo = self.repo
622 621 hashstate = self.hashstate
623 622 baseaddress = self.baseaddress
624 623 self.server = cls(self.address, _requesthandler)
625 624 self.server.idletimeout = self.ui.configint(
626 625 'chgserver', 'idletimeout', self.server.idletimeout)
627 626 self.server.startautoexitthread()
628 627 self._createsymlink()
629 628
630 629 def _inithashstate(self):
631 630 self.baseaddress = self.address
632 631 if self.ui.configbool('chgserver', 'skiphash', False):
633 632 self.hashstate = None
634 633 return
635 634 self.hashstate = hashstate.fromui(self.ui)
636 635 self.address = _hashaddress(self.address, self.hashstate.confighash)
637 636
638 637 def _checkextensions(self):
639 638 if not self.hashstate:
640 639 return
641 640 if extensions.notloaded():
642 641 # one or more extensions failed to load. mtimehash becomes
643 642 # meaningless because we do not know the paths of those extensions.
644 643 # set mtimehash to an illegal hash value to invalidate the server.
645 644 self.hashstate.mtimehash = ''
646 645
647 646 def _createsymlink(self):
648 647 if self.baseaddress == self.address:
649 648 return
650 649 tempaddress = _tempaddress(self.baseaddress)
651 650 os.symlink(os.path.basename(self.address), tempaddress)
652 651 util.rename(tempaddress, self.baseaddress)
653 652
654 653 def _cleanup(self):
655 654 self.server.unlinksocketfile()
656 655
657 656 def uisetup(ui):
658 657 commandserver._servicemap['chgunix'] = chgunixservice
659 658
660 659 # CHGINTERNALMARK is temporarily set by chg client to detect if chg will
661 660 # start another chg. drop it to avoid possible side effects.
662 661 if 'CHGINTERNALMARK' in os.environ:
663 662 del os.environ['CHGINTERNALMARK']
@@ -1,435 +1,437
1 1 # commandserver.py - communicate with Mercurial's API over a pipe
2 2 #
3 3 # Copyright Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import gc
12 12 import os
13 13 import random
14 14 import struct
15 15 import sys
16 16 import traceback
17 17
18 18 from .i18n import _
19 19 from . import (
20 20 encoding,
21 21 error,
22 22 util,
23 23 )
24 24
25 25 socketserver = util.socketserver
26 26
27 27 logfile = None
28 28
29 29 def log(*args):
30 30 if not logfile:
31 31 return
32 32
33 33 for a in args:
34 34 logfile.write(str(a))
35 35
36 36 logfile.flush()
37 37
38 38 class channeledoutput(object):
39 39 """
40 40 Write data to out in the following format:
41 41
42 42 data length (unsigned int),
43 43 data
44 44 """
45 45 def __init__(self, out, channel):
46 46 self.out = out
47 47 self.channel = channel
48 48
49 49 @property
50 50 def name(self):
51 51 return '<%c-channel>' % self.channel
52 52
53 53 def write(self, data):
54 54 if not data:
55 55 return
56 56 self.out.write(struct.pack('>cI', self.channel, len(data)))
57 57 self.out.write(data)
58 58 self.out.flush()
59 59
60 60 def __getattr__(self, attr):
61 61 if attr in ('isatty', 'fileno', 'tell', 'seek'):
62 62 raise AttributeError(attr)
63 63 return getattr(self.out, attr)
64 64
65 65 class channeledinput(object):
66 66 """
67 67 Read data from in_.
68 68
69 69 Requests for input are written to out in the following format:
70 70 channel identifier - 'I' for plain input, 'L' line based (1 byte)
71 71 how many bytes to send at most (unsigned int),
72 72
73 73 The client replies with:
74 74 data length (unsigned int), 0 meaning EOF
75 75 data
76 76 """
77 77
78 78 maxchunksize = 4 * 1024
79 79
80 80 def __init__(self, in_, out, channel):
81 81 self.in_ = in_
82 82 self.out = out
83 83 self.channel = channel
84 84
85 85 @property
86 86 def name(self):
87 87 return '<%c-channel>' % self.channel
88 88
89 89 def read(self, size=-1):
90 90 if size < 0:
91 91 # if we need to consume all the clients input, ask for 4k chunks
92 92 # so the pipe doesn't fill up risking a deadlock
93 93 size = self.maxchunksize
94 94 s = self._read(size, self.channel)
95 95 buf = s
96 96 while s:
97 97 s = self._read(size, self.channel)
98 98 buf += s
99 99
100 100 return buf
101 101 else:
102 102 return self._read(size, self.channel)
103 103
104 104 def _read(self, size, channel):
105 105 if not size:
106 106 return ''
107 107 assert size > 0
108 108
109 109 # tell the client we need at most size bytes
110 110 self.out.write(struct.pack('>cI', channel, size))
111 111 self.out.flush()
112 112
113 113 length = self.in_.read(4)
114 114 length = struct.unpack('>I', length)[0]
115 115 if not length:
116 116 return ''
117 117 else:
118 118 return self.in_.read(length)
119 119
120 120 def readline(self, size=-1):
121 121 if size < 0:
122 122 size = self.maxchunksize
123 123 s = self._read(size, 'L')
124 124 buf = s
125 125 # keep asking for more until there's either no more or
126 126 # we got a full line
127 127 while s and s[-1] != '\n':
128 128 s = self._read(size, 'L')
129 129 buf += s
130 130
131 131 return buf
132 132 else:
133 133 return self._read(size, 'L')
134 134
135 135 def __iter__(self):
136 136 return self
137 137
138 138 def next(self):
139 139 l = self.readline()
140 140 if not l:
141 141 raise StopIteration
142 142 return l
143 143
144 144 def __getattr__(self, attr):
145 145 if attr in ('isatty', 'fileno', 'tell', 'seek'):
146 146 raise AttributeError(attr)
147 147 return getattr(self.in_, attr)
148 148
149 149 class server(object):
150 150 """
151 151 Listens for commands on fin, runs them and writes the output on a channel
152 152 based stream to fout.
153 153 """
154 154 def __init__(self, ui, repo, fin, fout):
155 155 self.cwd = os.getcwd()
156 156
157 157 # developer config: cmdserver.log
158 158 logpath = ui.config("cmdserver", "log", None)
159 159 if logpath:
160 160 global logfile
161 161 if logpath == '-':
162 162 # write log on a special 'd' (debug) channel
163 163 logfile = channeledoutput(fout, 'd')
164 164 else:
165 165 logfile = open(logpath, 'a')
166 166
167 167 if repo:
168 168 # the ui here is really the repo ui so take its baseui so we don't
169 169 # end up with its local configuration
170 170 self.ui = repo.baseui
171 171 self.repo = repo
172 172 self.repoui = repo.ui
173 173 else:
174 174 self.ui = ui
175 175 self.repo = self.repoui = None
176 176
177 177 self.cerr = channeledoutput(fout, 'e')
178 178 self.cout = channeledoutput(fout, 'o')
179 179 self.cin = channeledinput(fin, fout, 'I')
180 180 self.cresult = channeledoutput(fout, 'r')
181 181
182 182 self.client = fin
183 183
184 184 def cleanup(self):
185 185 """release and restore resources taken during server session"""
186 186 pass
187 187
188 188 def _read(self, size):
189 189 if not size:
190 190 return ''
191 191
192 192 data = self.client.read(size)
193 193
194 194 # is the other end closed?
195 195 if not data:
196 196 raise EOFError
197 197
198 198 return data
199 199
200 200 def _readstr(self):
201 201 """read a string from the channel
202 202
203 203 format:
204 204 data length (uint32), data
205 205 """
206 206 length = struct.unpack('>I', self._read(4))[0]
207 207 if not length:
208 208 return ''
209 209 return self._read(length)
210 210
211 211 def _readlist(self):
212 212 """read a list of NULL separated strings from the channel"""
213 213 s = self._readstr()
214 214 if s:
215 215 return s.split('\0')
216 216 else:
217 217 return []
218 218
219 219 def runcommand(self):
220 220 """ reads a list of \0 terminated arguments, executes
221 221 and writes the return code to the result channel """
222 222 from . import dispatch # avoid cycle
223 223
224 224 args = self._readlist()
225 225
226 226 # copy the uis so changes (e.g. --config or --verbose) don't
227 227 # persist between requests
228 228 copiedui = self.ui.copy()
229 229 uis = [copiedui]
230 230 if self.repo:
231 231 self.repo.baseui = copiedui
232 232 # clone ui without using ui.copy because this is protected
233 233 repoui = self.repoui.__class__(self.repoui)
234 234 repoui.copy = copiedui.copy # redo copy protection
235 235 uis.append(repoui)
236 236 self.repo.ui = self.repo.dirstate._ui = repoui
237 237 self.repo.invalidateall()
238 238
239 239 for ui in uis:
240 240 ui.resetstate()
241 241 # any kind of interaction must use server channels, but chg may
242 242 # replace channels by fully functional tty files. so nontty is
243 243 # enforced only if cin is a channel.
244 244 if not util.safehasattr(self.cin, 'fileno'):
245 245 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
246 246
247 247 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
248 248 self.cout, self.cerr)
249 249
250 250 ret = (dispatch.dispatch(req) or 0) & 255 # might return None
251 251
252 252 # restore old cwd
253 253 if '--cwd' in args:
254 254 os.chdir(self.cwd)
255 255
256 256 self.cresult.write(struct.pack('>i', int(ret)))
257 257
258 258 def getencoding(self):
259 259 """ writes the current encoding to the result channel """
260 260 self.cresult.write(encoding.encoding)
261 261
262 262 def serveone(self):
263 263 cmd = self.client.readline()[:-1]
264 264 if cmd:
265 265 handler = self.capabilities.get(cmd)
266 266 if handler:
267 267 handler(self)
268 268 else:
269 269 # clients are expected to check what commands are supported by
270 270 # looking at the servers capabilities
271 271 raise error.Abort(_('unknown command %s') % cmd)
272 272
273 273 return cmd != ''
274 274
275 275 capabilities = {'runcommand' : runcommand,
276 276 'getencoding' : getencoding}
277 277
278 278 def serve(self):
279 279 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
280 280 hellomsg += '\n'
281 281 hellomsg += 'encoding: ' + encoding.encoding
282 282 hellomsg += '\n'
283 283 hellomsg += 'pid: %d' % util.getpid()
284 284
285 285 # write the hello msg in -one- chunk
286 286 self.cout.write(hellomsg)
287 287
288 288 try:
289 289 while self.serveone():
290 290 pass
291 291 except EOFError:
292 292 # we'll get here if the client disconnected while we were reading
293 293 # its request
294 294 return 1
295 295
296 296 return 0
297 297
298 298 def _protectio(ui):
299 299 """ duplicates streams and redirect original to null if ui uses stdio """
300 300 ui.flush()
301 301 newfiles = []
302 302 nullfd = os.open(os.devnull, os.O_RDWR)
303 303 for f, sysf, mode in [(ui.fin, sys.stdin, 'rb'),
304 304 (ui.fout, sys.stdout, 'wb')]:
305 305 if f is sysf:
306 306 newfd = os.dup(f.fileno())
307 307 os.dup2(nullfd, f.fileno())
308 308 f = os.fdopen(newfd, mode)
309 309 newfiles.append(f)
310 310 os.close(nullfd)
311 311 return tuple(newfiles)
312 312
313 313 def _restoreio(ui, fin, fout):
314 314 """ restores streams from duplicated ones """
315 315 ui.flush()
316 316 for f, uif in [(fin, ui.fin), (fout, ui.fout)]:
317 317 if f is not uif:
318 318 os.dup2(f.fileno(), uif.fileno())
319 319 f.close()
320 320
321 321 class pipeservice(object):
322 322 def __init__(self, ui, repo, opts):
323 323 self.ui = ui
324 324 self.repo = repo
325 325
326 326 def init(self):
327 327 pass
328 328
329 329 def run(self):
330 330 ui = self.ui
331 331 # redirect stdio to null device so that broken extensions or in-process
332 332 # hooks will never cause corruption of channel protocol.
333 333 fin, fout = _protectio(ui)
334 334 try:
335 335 sv = server(ui, self.repo, fin, fout)
336 336 return sv.serve()
337 337 finally:
338 338 sv.cleanup()
339 339 _restoreio(ui, fin, fout)
340 340
341 class _requesthandler(socketserver.BaseRequestHandler):
342 def handle(self):
341 def _serverequest(ui, repo, conn, createcmdserver):
342 if True: # TODO: unindent
343 343 # use a different process group from the master process, making this
344 344 # process pass kernel "is_current_pgrp_orphaned" check so signals like
345 345 # SIGTSTP, SIGTTIN, SIGTTOU are not ignored.
346 346 os.setpgid(0, 0)
347 347 # change random state otherwise forked request handlers would have a
348 348 # same state inherited from parent.
349 349 random.seed()
350 ui = self.server.ui
351 350
352 conn = self.request
353 351 fin = conn.makefile('rb')
354 352 fout = conn.makefile('wb')
355 353 sv = None
356 354 try:
357 sv = self._createcmdserver(conn, fin, fout)
355 sv = createcmdserver(repo, conn, fin, fout)
358 356 try:
359 357 sv.serve()
360 358 # handle exceptions that may be raised by command server. most of
361 359 # known exceptions are caught by dispatch.
362 360 except error.Abort as inst:
363 361 ui.warn(_('abort: %s\n') % inst)
364 362 except IOError as inst:
365 363 if inst.errno != errno.EPIPE:
366 364 raise
367 365 except KeyboardInterrupt:
368 366 pass
369 367 finally:
370 368 sv.cleanup()
371 369 except: # re-raises
372 370 # also write traceback to error channel. otherwise client cannot
373 371 # see it because it is written to server's stderr by default.
374 372 if sv:
375 373 cerr = sv.cerr
376 374 else:
377 375 cerr = channeledoutput(fout, 'e')
378 376 traceback.print_exc(file=cerr)
379 377 raise
380 378 finally:
381 379 fin.close()
382 380 try:
383 381 fout.close() # implicit flush() may cause another EPIPE
384 382 except IOError as inst:
385 383 if inst.errno != errno.EPIPE:
386 384 raise
387 385 # trigger __del__ since ForkingMixIn uses os._exit
388 386 gc.collect()
389 387
390 def _createcmdserver(self, conn, fin, fout):
388 class _requesthandler(socketserver.BaseRequestHandler):
389 def handle(self):
390 _serverequest(self.server.ui, self.server.repo, self.request,
391 self._createcmdserver)
392
393 def _createcmdserver(self, repo, conn, fin, fout):
391 394 ui = self.server.ui
392 repo = self.server.repo
393 395 return server(ui, repo, fin, fout)
394 396
395 397 class unixservice(object):
396 398 """
397 399 Listens on unix domain socket and forks server per connection
398 400 """
399 401 def __init__(self, ui, repo, opts):
400 402 self.ui = ui
401 403 self.repo = repo
402 404 self.address = opts['address']
403 405 if not util.safehasattr(socketserver, 'UnixStreamServer'):
404 406 raise error.Abort(_('unsupported platform'))
405 407 if not self.address:
406 408 raise error.Abort(_('no socket path specified with --address'))
407 409
408 410 def init(self):
409 411 class cls(socketserver.ForkingMixIn, socketserver.UnixStreamServer):
410 412 ui = self.ui
411 413 repo = self.repo
412 414 self.server = cls(self.address, _requesthandler)
413 415 self.ui.status(_('listening at %s\n') % self.address)
414 416 self.ui.flush() # avoid buffering of status message
415 417
416 418 def _cleanup(self):
417 419 os.unlink(self.address)
418 420
419 421 def run(self):
420 422 try:
421 423 self.server.serve_forever()
422 424 finally:
423 425 self._cleanup()
424 426
425 427 _servicemap = {
426 428 'pipe': pipeservice,
427 429 'unix': unixservice,
428 430 }
429 431
430 432 def createservice(ui, repo, opts):
431 433 mode = opts['cmdserver']
432 434 try:
433 435 return _servicemap[mode](ui, repo, opts)
434 436 except KeyError:
435 437 raise error.Abort(_('unknown mode %s') % mode)
General Comments 0
You need to be logged in to leave comments. Login now