##// END OF EJS Templates
commandserver: manually create file objects from socket...
Yuya Nishihara -
r29542:6011ad3b default
parent child Browse files
Show More
@@ -1,663 +1,663
1 1 # chgserver.py - command server extension for cHg
2 2 #
3 3 # Copyright 2011 Yuya Nishihara <yuya@tcha.org>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """command server extension for cHg (EXPERIMENTAL)
9 9
10 10 'S' channel (read/write)
11 11 propagate ui.system() request to client
12 12
13 13 'attachio' command
14 14 attach client's stdio passed by sendmsg()
15 15
16 16 'chdir' command
17 17 change current directory
18 18
19 19 'getpager' command
20 20 checks if pager is enabled and which pager should be executed
21 21
22 22 'setenv' command
23 23 replace os.environ completely
24 24
25 25 'setumask' command
26 26 set umask
27 27
28 28 'validate' command
29 29 reload the config and check if the server is up to date
30 30
31 31 Config
32 32 ------
33 33
34 34 ::
35 35
36 36 [chgserver]
37 37 idletimeout = 3600 # seconds, after which an idle server will exit
38 38 skiphash = False # whether to skip config or env change checks
39 39 """
40 40
41 41 from __future__ import absolute_import
42 42
43 43 import errno
44 44 import hashlib
45 45 import inspect
46 46 import os
47 47 import re
48 48 import signal
49 49 import struct
50 50 import sys
51 51 import threading
52 52 import time
53 53
54 54 from mercurial.i18n import _
55 55
56 56 from mercurial import (
57 57 cmdutil,
58 58 commands,
59 59 commandserver,
60 60 dispatch,
61 61 error,
62 62 extensions,
63 63 osutil,
64 64 util,
65 65 )
66 66
67 67 socketserver = util.socketserver
68 68
69 69 # Note for extension authors: ONLY specify testedwith = 'internal' for
70 70 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
71 71 # be specifying the version(s) of Mercurial they are tested with, or
72 72 # leave the attribute unspecified.
73 73 testedwith = 'internal'
74 74
75 75 _log = commandserver.log
76 76
77 77 def _hashlist(items):
78 78 """return sha1 hexdigest for a list"""
79 79 return hashlib.sha1(str(items)).hexdigest()
80 80
81 81 # sensitive config sections affecting confighash
82 82 _configsections = [
83 83 'alias', # affects global state commands.table
84 84 'extdiff', # uisetup will register new commands
85 85 'extensions',
86 86 ]
87 87
88 88 # sensitive environment variables affecting confighash
89 89 _envre = re.compile(r'''\A(?:
90 90 CHGHG
91 91 |HG.*
92 92 |LANG(?:UAGE)?
93 93 |LC_.*
94 94 |LD_.*
95 95 |PATH
96 96 |PYTHON.*
97 97 |TERM(?:INFO)?
98 98 |TZ
99 99 )\Z''', re.X)
100 100
101 101 def _confighash(ui):
102 102 """return a quick hash for detecting config/env changes
103 103
104 104 confighash is the hash of sensitive config items and environment variables.
105 105
106 106 for chgserver, it is designed that once confighash changes, the server is
107 107 not qualified to serve its client and should redirect the client to a new
108 108 server. different from mtimehash, confighash change will not mark the
109 109 server outdated and exit since the user can have different configs at the
110 110 same time.
111 111 """
112 112 sectionitems = []
113 113 for section in _configsections:
114 114 sectionitems.append(ui.configitems(section))
115 115 sectionhash = _hashlist(sectionitems)
116 116 envitems = [(k, v) for k, v in os.environ.iteritems() if _envre.match(k)]
117 117 envhash = _hashlist(sorted(envitems))
118 118 return sectionhash[:6] + envhash[:6]
119 119
120 120 def _getmtimepaths(ui):
121 121 """get a list of paths that should be checked to detect change
122 122
123 123 The list will include:
124 124 - extensions (will not cover all files for complex extensions)
125 125 - mercurial/__version__.py
126 126 - python binary
127 127 """
128 128 modules = [m for n, m in extensions.extensions(ui)]
129 129 try:
130 130 from mercurial import __version__
131 131 modules.append(__version__)
132 132 except ImportError:
133 133 pass
134 134 files = [sys.executable]
135 135 for m in modules:
136 136 try:
137 137 files.append(inspect.getabsfile(m))
138 138 except TypeError:
139 139 pass
140 140 return sorted(set(files))
141 141
142 142 def _mtimehash(paths):
143 143 """return a quick hash for detecting file changes
144 144
145 145 mtimehash calls stat on given paths and calculate a hash based on size and
146 146 mtime of each file. mtimehash does not read file content because reading is
147 147 expensive. therefore it's not 100% reliable for detecting content changes.
148 148 it's possible to return different hashes for same file contents.
149 149 it's also possible to return a same hash for different file contents for
150 150 some carefully crafted situation.
151 151
152 152 for chgserver, it is designed that once mtimehash changes, the server is
153 153 considered outdated immediately and should no longer provide service.
154 154
155 155 mtimehash is not included in confighash because we only know the paths of
156 156 extensions after importing them (there is imp.find_module but that faces
157 157 race conditions). We need to calculate confighash without importing.
158 158 """
159 159 def trystat(path):
160 160 try:
161 161 st = os.stat(path)
162 162 return (st.st_mtime, st.st_size)
163 163 except OSError:
164 164 # could be ENOENT, EPERM etc. not fatal in any case
165 165 pass
166 166 return _hashlist(map(trystat, paths))[:12]
167 167
168 168 class hashstate(object):
169 169 """a structure storing confighash, mtimehash, paths used for mtimehash"""
170 170 def __init__(self, confighash, mtimehash, mtimepaths):
171 171 self.confighash = confighash
172 172 self.mtimehash = mtimehash
173 173 self.mtimepaths = mtimepaths
174 174
175 175 @staticmethod
176 176 def fromui(ui, mtimepaths=None):
177 177 if mtimepaths is None:
178 178 mtimepaths = _getmtimepaths(ui)
179 179 confighash = _confighash(ui)
180 180 mtimehash = _mtimehash(mtimepaths)
181 181 _log('confighash = %s mtimehash = %s\n' % (confighash, mtimehash))
182 182 return hashstate(confighash, mtimehash, mtimepaths)
183 183
184 184 # copied from hgext/pager.py:uisetup()
185 185 def _setuppagercmd(ui, options, cmd):
186 186 if not ui.formatted():
187 187 return
188 188
189 189 p = ui.config("pager", "pager", os.environ.get("PAGER"))
190 190 usepager = False
191 191 always = util.parsebool(options['pager'])
192 192 auto = options['pager'] == 'auto'
193 193
194 194 if not p:
195 195 pass
196 196 elif always:
197 197 usepager = True
198 198 elif not auto:
199 199 usepager = False
200 200 else:
201 201 attended = ['annotate', 'cat', 'diff', 'export', 'glog', 'log', 'qdiff']
202 202 attend = ui.configlist('pager', 'attend', attended)
203 203 ignore = ui.configlist('pager', 'ignore')
204 204 cmds, _ = cmdutil.findcmd(cmd, commands.table)
205 205
206 206 for cmd in cmds:
207 207 var = 'attend-%s' % cmd
208 208 if ui.config('pager', var):
209 209 usepager = ui.configbool('pager', var)
210 210 break
211 211 if (cmd in attend or
212 212 (cmd not in ignore and not attend)):
213 213 usepager = True
214 214 break
215 215
216 216 if usepager:
217 217 ui.setconfig('ui', 'formatted', ui.formatted(), 'pager')
218 218 ui.setconfig('ui', 'interactive', False, 'pager')
219 219 return p
220 220
221 221 def _newchgui(srcui, csystem):
222 222 class chgui(srcui.__class__):
223 223 def __init__(self, src=None):
224 224 super(chgui, self).__init__(src)
225 225 if src:
226 226 self._csystem = getattr(src, '_csystem', csystem)
227 227 else:
228 228 self._csystem = csystem
229 229
230 230 def system(self, cmd, environ=None, cwd=None, onerr=None,
231 231 errprefix=None):
232 232 # fallback to the original system method if the output needs to be
233 233 # captured (to self._buffers), or the output stream is not stdout
234 234 # (e.g. stderr, cStringIO), because the chg client is not aware of
235 235 # these situations and will behave differently (write to stdout).
236 236 if (any(s[1] for s in self._bufferstates)
237 237 or not util.safehasattr(self.fout, 'fileno')
238 238 or self.fout.fileno() != sys.stdout.fileno()):
239 239 return super(chgui, self).system(cmd, environ, cwd, onerr,
240 240 errprefix)
241 241 # copied from mercurial/util.py:system()
242 242 self.flush()
243 243 def py2shell(val):
244 244 if val is None or val is False:
245 245 return '0'
246 246 if val is True:
247 247 return '1'
248 248 return str(val)
249 249 env = os.environ.copy()
250 250 if environ:
251 251 env.update((k, py2shell(v)) for k, v in environ.iteritems())
252 252 env['HG'] = util.hgexecutable()
253 253 rc = self._csystem(cmd, env, cwd)
254 254 if rc and onerr:
255 255 errmsg = '%s %s' % (os.path.basename(cmd.split(None, 1)[0]),
256 256 util.explainexit(rc)[0])
257 257 if errprefix:
258 258 errmsg = '%s: %s' % (errprefix, errmsg)
259 259 raise onerr(errmsg)
260 260 return rc
261 261
262 262 return chgui(srcui)
263 263
264 264 def _loadnewui(srcui, args):
265 265 newui = srcui.__class__()
266 266 for a in ['fin', 'fout', 'ferr', 'environ']:
267 267 setattr(newui, a, getattr(srcui, a))
268 268 if util.safehasattr(srcui, '_csystem'):
269 269 newui._csystem = srcui._csystem
270 270
271 271 # internal config: extensions.chgserver
272 272 newui.setconfig('extensions', 'chgserver',
273 273 srcui.config('extensions', 'chgserver'), '--config')
274 274
275 275 # command line args
276 276 args = args[:]
277 277 dispatch._parseconfig(newui, dispatch._earlygetopt(['--config'], args))
278 278
279 279 # stolen from tortoisehg.util.copydynamicconfig()
280 280 for section, name, value in srcui.walkconfig():
281 281 source = srcui.configsource(section, name)
282 282 if ':' in source or source == '--config':
283 283 # path:line or command line
284 284 continue
285 285 if source == 'none':
286 286 # ui.configsource returns 'none' by default
287 287 source = ''
288 288 newui.setconfig(section, name, value, source)
289 289
290 290 # load wd and repo config, copied from dispatch.py
291 291 cwds = dispatch._earlygetopt(['--cwd'], args)
292 292 cwd = cwds and os.path.realpath(cwds[-1]) or None
293 293 rpath = dispatch._earlygetopt(["-R", "--repository", "--repo"], args)
294 294 path, newlui = dispatch._getlocal(newui, rpath, wd=cwd)
295 295
296 296 return (newui, newlui)
297 297
298 298 class channeledsystem(object):
299 299 """Propagate ui.system() request in the following format:
300 300
301 301 payload length (unsigned int),
302 302 cmd, '\0',
303 303 cwd, '\0',
304 304 envkey, '=', val, '\0',
305 305 ...
306 306 envkey, '=', val
307 307
308 308 and waits:
309 309
310 310 exitcode length (unsigned int),
311 311 exitcode (int)
312 312 """
313 313 def __init__(self, in_, out, channel):
314 314 self.in_ = in_
315 315 self.out = out
316 316 self.channel = channel
317 317
318 318 def __call__(self, cmd, environ, cwd):
319 319 args = [util.quotecommand(cmd), os.path.abspath(cwd or '.')]
320 320 args.extend('%s=%s' % (k, v) for k, v in environ.iteritems())
321 321 data = '\0'.join(args)
322 322 self.out.write(struct.pack('>cI', self.channel, len(data)))
323 323 self.out.write(data)
324 324 self.out.flush()
325 325
326 326 length = self.in_.read(4)
327 327 length, = struct.unpack('>I', length)
328 328 if length != 4:
329 329 raise error.Abort(_('invalid response'))
330 330 rc, = struct.unpack('>i', self.in_.read(4))
331 331 return rc
332 332
333 333 _iochannels = [
334 334 # server.ch, ui.fp, mode
335 335 ('cin', 'fin', 'rb'),
336 336 ('cout', 'fout', 'wb'),
337 337 ('cerr', 'ferr', 'wb'),
338 338 ]
339 339
340 340 class chgcmdserver(commandserver.server):
341 341 def __init__(self, ui, repo, fin, fout, sock, hashstate, baseaddress):
342 342 super(chgcmdserver, self).__init__(
343 343 _newchgui(ui, channeledsystem(fin, fout, 'S')), repo, fin, fout)
344 344 self.clientsock = sock
345 345 self._oldios = [] # original (self.ch, ui.fp, fd) before "attachio"
346 346 self.hashstate = hashstate
347 347 self.baseaddress = baseaddress
348 348 if hashstate is not None:
349 349 self.capabilities = self.capabilities.copy()
350 350 self.capabilities['validate'] = chgcmdserver.validate
351 351
352 352 def cleanup(self):
353 353 super(chgcmdserver, self).cleanup()
354 354 # dispatch._runcatch() does not flush outputs if exception is not
355 355 # handled by dispatch._dispatch()
356 356 self.ui.flush()
357 357 self._restoreio()
358 358
359 359 def attachio(self):
360 360 """Attach to client's stdio passed via unix domain socket; all
361 361 channels except cresult will no longer be used
362 362 """
363 363 # tell client to sendmsg() with 1-byte payload, which makes it
364 364 # distinctive from "attachio\n" command consumed by client.read()
365 365 self.clientsock.sendall(struct.pack('>cI', 'I', 1))
366 366 clientfds = osutil.recvfds(self.clientsock.fileno())
367 367 _log('received fds: %r\n' % clientfds)
368 368
369 369 ui = self.ui
370 370 ui.flush()
371 371 first = self._saveio()
372 372 for fd, (cn, fn, mode) in zip(clientfds, _iochannels):
373 373 assert fd > 0
374 374 fp = getattr(ui, fn)
375 375 os.dup2(fd, fp.fileno())
376 376 os.close(fd)
377 377 if not first:
378 378 continue
379 379 # reset buffering mode when client is first attached. as we want
380 380 # to see output immediately on pager, the mode stays unchanged
381 381 # when client re-attached. ferr is unchanged because it should
382 382 # be unbuffered no matter if it is a tty or not.
383 383 if fn == 'ferr':
384 384 newfp = fp
385 385 else:
386 386 # make it line buffered explicitly because the default is
387 387 # decided on first write(), where fout could be a pager.
388 388 if fp.isatty():
389 389 bufsize = 1 # line buffered
390 390 else:
391 391 bufsize = -1 # system default
392 392 newfp = os.fdopen(fp.fileno(), mode, bufsize)
393 393 setattr(ui, fn, newfp)
394 394 setattr(self, cn, newfp)
395 395
396 396 self.cresult.write(struct.pack('>i', len(clientfds)))
397 397
398 398 def _saveio(self):
399 399 if self._oldios:
400 400 return False
401 401 ui = self.ui
402 402 for cn, fn, _mode in _iochannels:
403 403 ch = getattr(self, cn)
404 404 fp = getattr(ui, fn)
405 405 fd = os.dup(fp.fileno())
406 406 self._oldios.append((ch, fp, fd))
407 407 return True
408 408
409 409 def _restoreio(self):
410 410 ui = self.ui
411 411 for (ch, fp, fd), (cn, fn, _mode) in zip(self._oldios, _iochannels):
412 412 newfp = getattr(ui, fn)
413 413 # close newfp while it's associated with client; otherwise it
414 414 # would be closed when newfp is deleted
415 415 if newfp is not fp:
416 416 newfp.close()
417 417 # restore original fd: fp is open again
418 418 os.dup2(fd, fp.fileno())
419 419 os.close(fd)
420 420 setattr(self, cn, ch)
421 421 setattr(ui, fn, fp)
422 422 del self._oldios[:]
423 423
424 424 def validate(self):
425 425 """Reload the config and check if the server is up to date
426 426
427 427 Read a list of '\0' separated arguments.
428 428 Write a non-empty list of '\0' separated instruction strings or '\0'
429 429 if the list is empty.
430 430 An instruction string could be either:
431 431 - "unlink $path", the client should unlink the path to stop the
432 432 outdated server.
433 433 - "redirect $path", the client should attempt to connect to $path
434 434 first. If it does not work, start a new server. It implies
435 435 "reconnect".
436 436 - "exit $n", the client should exit directly with code n.
437 437 This may happen if we cannot parse the config.
438 438 - "reconnect", the client should close the connection and
439 439 reconnect.
440 440 If neither "reconnect" nor "redirect" is included in the instruction
441 441 list, the client can continue with this server after completing all
442 442 the instructions.
443 443 """
444 444 args = self._readlist()
445 445 try:
446 446 self.ui, lui = _loadnewui(self.ui, args)
447 447 except error.ParseError as inst:
448 448 dispatch._formatparse(self.ui.warn, inst)
449 449 self.ui.flush()
450 450 self.cresult.write('exit 255')
451 451 return
452 452 newhash = hashstate.fromui(lui, self.hashstate.mtimepaths)
453 453 insts = []
454 454 if newhash.mtimehash != self.hashstate.mtimehash:
455 455 addr = _hashaddress(self.baseaddress, self.hashstate.confighash)
456 456 insts.append('unlink %s' % addr)
457 457 # mtimehash is empty if one or more extensions fail to load.
458 458 # to be compatible with hg, still serve the client this time.
459 459 if self.hashstate.mtimehash:
460 460 insts.append('reconnect')
461 461 if newhash.confighash != self.hashstate.confighash:
462 462 addr = _hashaddress(self.baseaddress, newhash.confighash)
463 463 insts.append('redirect %s' % addr)
464 464 _log('validate: %s\n' % insts)
465 465 self.cresult.write('\0'.join(insts) or '\0')
466 466
467 467 def chdir(self):
468 468 """Change current directory
469 469
470 470 Note that the behavior of --cwd option is bit different from this.
471 471 It does not affect --config parameter.
472 472 """
473 473 path = self._readstr()
474 474 if not path:
475 475 return
476 476 _log('chdir to %r\n' % path)
477 477 os.chdir(path)
478 478
479 479 def setumask(self):
480 480 """Change umask"""
481 481 mask = struct.unpack('>I', self._read(4))[0]
482 482 _log('setumask %r\n' % mask)
483 483 os.umask(mask)
484 484
485 485 def getpager(self):
486 486 """Read cmdargs and write pager command to r-channel if enabled
487 487
488 488 If pager isn't enabled, this writes '\0' because channeledoutput
489 489 does not allow to write empty data.
490 490 """
491 491 args = self._readlist()
492 492 try:
493 493 cmd, _func, args, options, _cmdoptions = dispatch._parse(self.ui,
494 494 args)
495 495 except (error.Abort, error.AmbiguousCommand, error.CommandError,
496 496 error.UnknownCommand):
497 497 cmd = None
498 498 options = {}
499 499 if not cmd or 'pager' not in options:
500 500 self.cresult.write('\0')
501 501 return
502 502
503 503 pagercmd = _setuppagercmd(self.ui, options, cmd)
504 504 if pagercmd:
505 505 # Python's SIGPIPE is SIG_IGN by default. change to SIG_DFL so
506 506 # we can exit if the pipe to the pager is closed
507 507 if util.safehasattr(signal, 'SIGPIPE') and \
508 508 signal.getsignal(signal.SIGPIPE) == signal.SIG_IGN:
509 509 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
510 510 self.cresult.write(pagercmd)
511 511 else:
512 512 self.cresult.write('\0')
513 513
514 514 def setenv(self):
515 515 """Clear and update os.environ
516 516
517 517 Note that not all variables can make an effect on the running process.
518 518 """
519 519 l = self._readlist()
520 520 try:
521 521 newenv = dict(s.split('=', 1) for s in l)
522 522 except ValueError:
523 523 raise ValueError('unexpected value in setenv request')
524 524 _log('setenv: %r\n' % sorted(newenv.keys()))
525 525 os.environ.clear()
526 526 os.environ.update(newenv)
527 527
528 528 capabilities = commandserver.server.capabilities.copy()
529 529 capabilities.update({'attachio': attachio,
530 530 'chdir': chdir,
531 531 'getpager': getpager,
532 532 'setenv': setenv,
533 533 'setumask': setumask})
534 534
535 535 class _requesthandler(commandserver._requesthandler):
536 def _createcmdserver(self):
536 def _createcmdserver(self, conn, fin, fout):
537 537 ui = self.server.ui
538 538 repo = self.server.repo
539 return chgcmdserver(ui, repo, self.rfile, self.wfile, self.connection,
539 return chgcmdserver(ui, repo, fin, fout, conn,
540 540 self.server.hashstate, self.server.baseaddress)
541 541
542 542 def _tempaddress(address):
543 543 return '%s.%d.tmp' % (address, os.getpid())
544 544
545 545 def _hashaddress(address, hashstr):
546 546 return '%s-%s' % (address, hashstr)
547 547
548 548 class AutoExitMixIn: # use old-style to comply with SocketServer design
549 549 lastactive = time.time()
550 550 idletimeout = 3600 # default 1 hour
551 551
552 552 def startautoexitthread(self):
553 553 # note: the auto-exit check here is cheap enough to not use a thread,
554 554 # be done in serve_forever. however SocketServer is hook-unfriendly,
555 555 # you simply cannot hook serve_forever without copying a lot of code.
556 556 # besides, serve_forever's docstring suggests using thread.
557 557 thread = threading.Thread(target=self._autoexitloop)
558 558 thread.daemon = True
559 559 thread.start()
560 560
561 561 def _autoexitloop(self, interval=1):
562 562 while True:
563 563 time.sleep(interval)
564 564 if not self.issocketowner():
565 565 _log('%s is not owned, exiting.\n' % self.server_address)
566 566 break
567 567 if time.time() - self.lastactive > self.idletimeout:
568 568 _log('being idle too long. exiting.\n')
569 569 break
570 570 self.shutdown()
571 571
572 572 def process_request(self, request, address):
573 573 self.lastactive = time.time()
574 574 return socketserver.ForkingMixIn.process_request(
575 575 self, request, address)
576 576
577 577 def server_bind(self):
578 578 # use a unique temp address so we can stat the file and do ownership
579 579 # check later
580 580 tempaddress = _tempaddress(self.server_address)
581 581 util.bindunixsocket(self.socket, tempaddress)
582 582 self._socketstat = os.stat(tempaddress)
583 583 # rename will replace the old socket file if exists atomically. the
584 584 # old server will detect ownership change and exit.
585 585 util.rename(tempaddress, self.server_address)
586 586
587 587 def issocketowner(self):
588 588 try:
589 589 stat = os.stat(self.server_address)
590 590 return (stat.st_ino == self._socketstat.st_ino and
591 591 stat.st_mtime == self._socketstat.st_mtime)
592 592 except OSError:
593 593 return False
594 594
595 595 def unlinksocketfile(self):
596 596 if not self.issocketowner():
597 597 return
598 598 # it is possible to have a race condition here that we may
599 599 # remove another server's socket file. but that's okay
600 600 # since that server will detect and exit automatically and
601 601 # the client will start a new server on demand.
602 602 try:
603 603 os.unlink(self.server_address)
604 604 except OSError as exc:
605 605 if exc.errno != errno.ENOENT:
606 606 raise
607 607
608 608 class chgunixservice(commandserver.unixservice):
609 609 def __init__(self, ui, repo, opts):
610 610 super(chgunixservice, self).__init__(ui, repo=None, opts=opts)
611 611 if repo:
612 612 # one chgserver can serve multiple repos. drop repo infomation
613 613 self.ui.setconfig('bundle', 'mainreporoot', '', 'repo')
614 614
615 615 def init(self):
616 616 self._inithashstate()
617 617 self._checkextensions()
618 618 class cls(AutoExitMixIn, socketserver.ForkingMixIn,
619 619 socketserver.UnixStreamServer):
620 620 ui = self.ui
621 621 repo = self.repo
622 622 hashstate = self.hashstate
623 623 baseaddress = self.baseaddress
624 624 self.server = cls(self.address, _requesthandler)
625 625 self.server.idletimeout = self.ui.configint(
626 626 'chgserver', 'idletimeout', self.server.idletimeout)
627 627 self.server.startautoexitthread()
628 628 self._createsymlink()
629 629
630 630 def _inithashstate(self):
631 631 self.baseaddress = self.address
632 632 if self.ui.configbool('chgserver', 'skiphash', False):
633 633 self.hashstate = None
634 634 return
635 635 self.hashstate = hashstate.fromui(self.ui)
636 636 self.address = _hashaddress(self.address, self.hashstate.confighash)
637 637
638 638 def _checkextensions(self):
639 639 if not self.hashstate:
640 640 return
641 641 if extensions.notloaded():
642 642 # one or more extensions failed to load. mtimehash becomes
643 643 # meaningless because we do not know the paths of those extensions.
644 644 # set mtimehash to an illegal hash value to invalidate the server.
645 645 self.hashstate.mtimehash = ''
646 646
647 647 def _createsymlink(self):
648 648 if self.baseaddress == self.address:
649 649 return
650 650 tempaddress = _tempaddress(self.baseaddress)
651 651 os.symlink(os.path.basename(self.address), tempaddress)
652 652 util.rename(tempaddress, self.baseaddress)
653 653
654 654 def _cleanup(self):
655 655 self.server.unlinksocketfile()
656 656
657 657 def uisetup(ui):
658 658 commandserver._servicemap['chgunix'] = chgunixservice
659 659
660 660 # CHGINTERNALMARK is temporarily set by chg client to detect if chg will
661 661 # start another chg. drop it to avoid possible side effects.
662 662 if 'CHGINTERNALMARK' in os.environ:
663 663 del os.environ['CHGINTERNALMARK']
@@ -1,425 +1,435
1 1 # commandserver.py - communicate with Mercurial's API over a pipe
2 2 #
3 3 # Copyright Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import gc
12 12 import os
13 13 import random
14 14 import struct
15 15 import sys
16 16 import traceback
17 17
18 18 from .i18n import _
19 19 from . import (
20 20 encoding,
21 21 error,
22 22 util,
23 23 )
24 24
25 25 socketserver = util.socketserver
26 26
27 27 logfile = None
28 28
29 29 def log(*args):
30 30 if not logfile:
31 31 return
32 32
33 33 for a in args:
34 34 logfile.write(str(a))
35 35
36 36 logfile.flush()
37 37
38 38 class channeledoutput(object):
39 39 """
40 40 Write data to out in the following format:
41 41
42 42 data length (unsigned int),
43 43 data
44 44 """
45 45 def __init__(self, out, channel):
46 46 self.out = out
47 47 self.channel = channel
48 48
49 49 @property
50 50 def name(self):
51 51 return '<%c-channel>' % self.channel
52 52
53 53 def write(self, data):
54 54 if not data:
55 55 return
56 56 self.out.write(struct.pack('>cI', self.channel, len(data)))
57 57 self.out.write(data)
58 58 self.out.flush()
59 59
60 60 def __getattr__(self, attr):
61 61 if attr in ('isatty', 'fileno', 'tell', 'seek'):
62 62 raise AttributeError(attr)
63 63 return getattr(self.out, attr)
64 64
65 65 class channeledinput(object):
66 66 """
67 67 Read data from in_.
68 68
69 69 Requests for input are written to out in the following format:
70 70 channel identifier - 'I' for plain input, 'L' line based (1 byte)
71 71 how many bytes to send at most (unsigned int),
72 72
73 73 The client replies with:
74 74 data length (unsigned int), 0 meaning EOF
75 75 data
76 76 """
77 77
78 78 maxchunksize = 4 * 1024
79 79
80 80 def __init__(self, in_, out, channel):
81 81 self.in_ = in_
82 82 self.out = out
83 83 self.channel = channel
84 84
85 85 @property
86 86 def name(self):
87 87 return '<%c-channel>' % self.channel
88 88
89 89 def read(self, size=-1):
90 90 if size < 0:
91 91 # if we need to consume all the clients input, ask for 4k chunks
92 92 # so the pipe doesn't fill up risking a deadlock
93 93 size = self.maxchunksize
94 94 s = self._read(size, self.channel)
95 95 buf = s
96 96 while s:
97 97 s = self._read(size, self.channel)
98 98 buf += s
99 99
100 100 return buf
101 101 else:
102 102 return self._read(size, self.channel)
103 103
104 104 def _read(self, size, channel):
105 105 if not size:
106 106 return ''
107 107 assert size > 0
108 108
109 109 # tell the client we need at most size bytes
110 110 self.out.write(struct.pack('>cI', channel, size))
111 111 self.out.flush()
112 112
113 113 length = self.in_.read(4)
114 114 length = struct.unpack('>I', length)[0]
115 115 if not length:
116 116 return ''
117 117 else:
118 118 return self.in_.read(length)
119 119
120 120 def readline(self, size=-1):
121 121 if size < 0:
122 122 size = self.maxchunksize
123 123 s = self._read(size, 'L')
124 124 buf = s
125 125 # keep asking for more until there's either no more or
126 126 # we got a full line
127 127 while s and s[-1] != '\n':
128 128 s = self._read(size, 'L')
129 129 buf += s
130 130
131 131 return buf
132 132 else:
133 133 return self._read(size, 'L')
134 134
135 135 def __iter__(self):
136 136 return self
137 137
138 138 def next(self):
139 139 l = self.readline()
140 140 if not l:
141 141 raise StopIteration
142 142 return l
143 143
144 144 def __getattr__(self, attr):
145 145 if attr in ('isatty', 'fileno', 'tell', 'seek'):
146 146 raise AttributeError(attr)
147 147 return getattr(self.in_, attr)
148 148
149 149 class server(object):
150 150 """
151 151 Listens for commands on fin, runs them and writes the output on a channel
152 152 based stream to fout.
153 153 """
154 154 def __init__(self, ui, repo, fin, fout):
155 155 self.cwd = os.getcwd()
156 156
157 157 # developer config: cmdserver.log
158 158 logpath = ui.config("cmdserver", "log", None)
159 159 if logpath:
160 160 global logfile
161 161 if logpath == '-':
162 162 # write log on a special 'd' (debug) channel
163 163 logfile = channeledoutput(fout, 'd')
164 164 else:
165 165 logfile = open(logpath, 'a')
166 166
167 167 if repo:
168 168 # the ui here is really the repo ui so take its baseui so we don't
169 169 # end up with its local configuration
170 170 self.ui = repo.baseui
171 171 self.repo = repo
172 172 self.repoui = repo.ui
173 173 else:
174 174 self.ui = ui
175 175 self.repo = self.repoui = None
176 176
177 177 self.cerr = channeledoutput(fout, 'e')
178 178 self.cout = channeledoutput(fout, 'o')
179 179 self.cin = channeledinput(fin, fout, 'I')
180 180 self.cresult = channeledoutput(fout, 'r')
181 181
182 182 self.client = fin
183 183
184 184 def cleanup(self):
185 185 """release and restore resources taken during server session"""
186 186 pass
187 187
188 188 def _read(self, size):
189 189 if not size:
190 190 return ''
191 191
192 192 data = self.client.read(size)
193 193
194 194 # is the other end closed?
195 195 if not data:
196 196 raise EOFError
197 197
198 198 return data
199 199
200 200 def _readstr(self):
201 201 """read a string from the channel
202 202
203 203 format:
204 204 data length (uint32), data
205 205 """
206 206 length = struct.unpack('>I', self._read(4))[0]
207 207 if not length:
208 208 return ''
209 209 return self._read(length)
210 210
211 211 def _readlist(self):
212 212 """read a list of NULL separated strings from the channel"""
213 213 s = self._readstr()
214 214 if s:
215 215 return s.split('\0')
216 216 else:
217 217 return []
218 218
219 219 def runcommand(self):
220 220 """ reads a list of \0 terminated arguments, executes
221 221 and writes the return code to the result channel """
222 222 from . import dispatch # avoid cycle
223 223
224 224 args = self._readlist()
225 225
226 226 # copy the uis so changes (e.g. --config or --verbose) don't
227 227 # persist between requests
228 228 copiedui = self.ui.copy()
229 229 uis = [copiedui]
230 230 if self.repo:
231 231 self.repo.baseui = copiedui
232 232 # clone ui without using ui.copy because this is protected
233 233 repoui = self.repoui.__class__(self.repoui)
234 234 repoui.copy = copiedui.copy # redo copy protection
235 235 uis.append(repoui)
236 236 self.repo.ui = self.repo.dirstate._ui = repoui
237 237 self.repo.invalidateall()
238 238
239 239 for ui in uis:
240 240 ui.resetstate()
241 241 # any kind of interaction must use server channels, but chg may
242 242 # replace channels by fully functional tty files. so nontty is
243 243 # enforced only if cin is a channel.
244 244 if not util.safehasattr(self.cin, 'fileno'):
245 245 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
246 246
247 247 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
248 248 self.cout, self.cerr)
249 249
250 250 ret = (dispatch.dispatch(req) or 0) & 255 # might return None
251 251
252 252 # restore old cwd
253 253 if '--cwd' in args:
254 254 os.chdir(self.cwd)
255 255
256 256 self.cresult.write(struct.pack('>i', int(ret)))
257 257
258 258 def getencoding(self):
259 259 """ writes the current encoding to the result channel """
260 260 self.cresult.write(encoding.encoding)
261 261
262 262 def serveone(self):
263 263 cmd = self.client.readline()[:-1]
264 264 if cmd:
265 265 handler = self.capabilities.get(cmd)
266 266 if handler:
267 267 handler(self)
268 268 else:
269 269 # clients are expected to check what commands are supported by
270 270 # looking at the servers capabilities
271 271 raise error.Abort(_('unknown command %s') % cmd)
272 272
273 273 return cmd != ''
274 274
275 275 capabilities = {'runcommand' : runcommand,
276 276 'getencoding' : getencoding}
277 277
278 278 def serve(self):
279 279 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
280 280 hellomsg += '\n'
281 281 hellomsg += 'encoding: ' + encoding.encoding
282 282 hellomsg += '\n'
283 283 hellomsg += 'pid: %d' % util.getpid()
284 284
285 285 # write the hello msg in -one- chunk
286 286 self.cout.write(hellomsg)
287 287
288 288 try:
289 289 while self.serveone():
290 290 pass
291 291 except EOFError:
292 292 # we'll get here if the client disconnected while we were reading
293 293 # its request
294 294 return 1
295 295
296 296 return 0
297 297
298 298 def _protectio(ui):
299 299 """ duplicates streams and redirect original to null if ui uses stdio """
300 300 ui.flush()
301 301 newfiles = []
302 302 nullfd = os.open(os.devnull, os.O_RDWR)
303 303 for f, sysf, mode in [(ui.fin, sys.stdin, 'rb'),
304 304 (ui.fout, sys.stdout, 'wb')]:
305 305 if f is sysf:
306 306 newfd = os.dup(f.fileno())
307 307 os.dup2(nullfd, f.fileno())
308 308 f = os.fdopen(newfd, mode)
309 309 newfiles.append(f)
310 310 os.close(nullfd)
311 311 return tuple(newfiles)
312 312
313 313 def _restoreio(ui, fin, fout):
314 314 """ restores streams from duplicated ones """
315 315 ui.flush()
316 316 for f, uif in [(fin, ui.fin), (fout, ui.fout)]:
317 317 if f is not uif:
318 318 os.dup2(f.fileno(), uif.fileno())
319 319 f.close()
320 320
321 321 class pipeservice(object):
322 322 def __init__(self, ui, repo, opts):
323 323 self.ui = ui
324 324 self.repo = repo
325 325
326 326 def init(self):
327 327 pass
328 328
329 329 def run(self):
330 330 ui = self.ui
331 331 # redirect stdio to null device so that broken extensions or in-process
332 332 # hooks will never cause corruption of channel protocol.
333 333 fin, fout = _protectio(ui)
334 334 try:
335 335 sv = server(ui, self.repo, fin, fout)
336 336 return sv.serve()
337 337 finally:
338 338 sv.cleanup()
339 339 _restoreio(ui, fin, fout)
340 340
341 class _requesthandler(socketserver.StreamRequestHandler):
341 class _requesthandler(socketserver.BaseRequestHandler):
342 342 def handle(self):
343 343 # use a different process group from the master process, making this
344 344 # process pass kernel "is_current_pgrp_orphaned" check so signals like
345 345 # SIGTSTP, SIGTTIN, SIGTTOU are not ignored.
346 346 os.setpgid(0, 0)
347 347 # change random state otherwise forked request handlers would have a
348 348 # same state inherited from parent.
349 349 random.seed()
350 350 ui = self.server.ui
351
352 conn = self.request
353 fin = conn.makefile('rb')
354 fout = conn.makefile('wb')
351 355 sv = None
352 356 try:
353 sv = self._createcmdserver()
357 sv = self._createcmdserver(conn, fin, fout)
354 358 try:
355 359 sv.serve()
356 360 # handle exceptions that may be raised by command server. most of
357 361 # known exceptions are caught by dispatch.
358 362 except error.Abort as inst:
359 363 ui.warn(_('abort: %s\n') % inst)
360 364 except IOError as inst:
361 365 if inst.errno != errno.EPIPE:
362 366 raise
363 367 except KeyboardInterrupt:
364 368 pass
365 369 finally:
366 370 sv.cleanup()
367 371 except: # re-raises
368 372 # also write traceback to error channel. otherwise client cannot
369 373 # see it because it is written to server's stderr by default.
370 374 if sv:
371 375 cerr = sv.cerr
372 376 else:
373 cerr = channeledoutput(self.wfile, 'e')
377 cerr = channeledoutput(fout, 'e')
374 378 traceback.print_exc(file=cerr)
375 379 raise
376 380 finally:
381 fin.close()
382 try:
383 fout.close() # implicit flush() may cause another EPIPE
384 except IOError as inst:
385 if inst.errno != errno.EPIPE:
386 raise
377 387 # trigger __del__ since ForkingMixIn uses os._exit
378 388 gc.collect()
379 389
380 def _createcmdserver(self):
390 def _createcmdserver(self, conn, fin, fout):
381 391 ui = self.server.ui
382 392 repo = self.server.repo
383 return server(ui, repo, self.rfile, self.wfile)
393 return server(ui, repo, fin, fout)
384 394
385 395 class unixservice(object):
386 396 """
387 397 Listens on unix domain socket and forks server per connection
388 398 """
389 399 def __init__(self, ui, repo, opts):
390 400 self.ui = ui
391 401 self.repo = repo
392 402 self.address = opts['address']
393 403 if not util.safehasattr(socketserver, 'UnixStreamServer'):
394 404 raise error.Abort(_('unsupported platform'))
395 405 if not self.address:
396 406 raise error.Abort(_('no socket path specified with --address'))
397 407
398 408 def init(self):
399 409 class cls(socketserver.ForkingMixIn, socketserver.UnixStreamServer):
400 410 ui = self.ui
401 411 repo = self.repo
402 412 self.server = cls(self.address, _requesthandler)
403 413 self.ui.status(_('listening at %s\n') % self.address)
404 414 self.ui.flush() # avoid buffering of status message
405 415
406 416 def _cleanup(self):
407 417 os.unlink(self.address)
408 418
409 419 def run(self):
410 420 try:
411 421 self.server.serve_forever()
412 422 finally:
413 423 self._cleanup()
414 424
415 425 _servicemap = {
416 426 'pipe': pipeservice,
417 427 'unix': unixservice,
418 428 }
419 429
420 430 def createservice(ui, repo, opts):
421 431 mode = opts['cmdserver']
422 432 try:
423 433 return _servicemap[mode](ui, repo, opts)
424 434 except KeyError:
425 435 raise error.Abort(_('unknown mode %s') % mode)
General Comments 0
You need to be logged in to leave comments. Login now