##// END OF EJS Templates
commandserver: backport handling of forking server from chgserver...
Yuya Nishihara -
r29513:e5b4d79a default
parent child Browse files
Show More
@@ -1,717 +1,675 b''
1 # chgserver.py - command server extension for cHg
1 # chgserver.py - command server extension for cHg
2 #
2 #
3 # Copyright 2011 Yuya Nishihara <yuya@tcha.org>
3 # Copyright 2011 Yuya Nishihara <yuya@tcha.org>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 """command server extension for cHg (EXPERIMENTAL)
8 """command server extension for cHg (EXPERIMENTAL)
9
9
10 'S' channel (read/write)
10 'S' channel (read/write)
11 propagate ui.system() request to client
11 propagate ui.system() request to client
12
12
13 'attachio' command
13 'attachio' command
14 attach client's stdio passed by sendmsg()
14 attach client's stdio passed by sendmsg()
15
15
16 'chdir' command
16 'chdir' command
17 change current directory
17 change current directory
18
18
19 'getpager' command
19 'getpager' command
20 checks if pager is enabled and which pager should be executed
20 checks if pager is enabled and which pager should be executed
21
21
22 'setenv' command
22 'setenv' command
23 replace os.environ completely
23 replace os.environ completely
24
24
25 'setumask' command
25 'setumask' command
26 set umask
26 set umask
27
27
28 'validate' command
28 'validate' command
29 reload the config and check if the server is up to date
29 reload the config and check if the server is up to date
30
30
31 Config
31 Config
32 ------
32 ------
33
33
34 ::
34 ::
35
35
36 [chgserver]
36 [chgserver]
37 idletimeout = 3600 # seconds, after which an idle server will exit
37 idletimeout = 3600 # seconds, after which an idle server will exit
38 skiphash = False # whether to skip config or env change checks
38 skiphash = False # whether to skip config or env change checks
39 """
39 """
40
40
41 from __future__ import absolute_import
41 from __future__ import absolute_import
42
42
43 import errno
43 import errno
44 import gc
45 import hashlib
44 import hashlib
46 import inspect
45 import inspect
47 import os
46 import os
48 import random
49 import re
47 import re
50 import signal
48 import signal
51 import struct
49 import struct
52 import sys
50 import sys
53 import threading
51 import threading
54 import time
52 import time
55 import traceback
56
53
57 from mercurial.i18n import _
54 from mercurial.i18n import _
58
55
59 from mercurial import (
56 from mercurial import (
60 cmdutil,
57 cmdutil,
61 commands,
58 commands,
62 commandserver,
59 commandserver,
63 dispatch,
60 dispatch,
64 error,
61 error,
65 extensions,
62 extensions,
66 osutil,
63 osutil,
67 util,
64 util,
68 )
65 )
69
66
70 socketserver = util.socketserver
67 socketserver = util.socketserver
71
68
72 # Note for extension authors: ONLY specify testedwith = 'internal' for
69 # Note for extension authors: ONLY specify testedwith = 'internal' for
73 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
70 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
74 # be specifying the version(s) of Mercurial they are tested with, or
71 # be specifying the version(s) of Mercurial they are tested with, or
75 # leave the attribute unspecified.
72 # leave the attribute unspecified.
76 testedwith = 'internal'
73 testedwith = 'internal'
77
74
78 _log = commandserver.log
75 _log = commandserver.log
79
76
80 def _hashlist(items):
77 def _hashlist(items):
81 """return sha1 hexdigest for a list"""
78 """return sha1 hexdigest for a list"""
82 return hashlib.sha1(str(items)).hexdigest()
79 return hashlib.sha1(str(items)).hexdigest()
83
80
84 # sensitive config sections affecting confighash
81 # sensitive config sections affecting confighash
85 _configsections = [
82 _configsections = [
86 'alias', # affects global state commands.table
83 'alias', # affects global state commands.table
87 'extdiff', # uisetup will register new commands
84 'extdiff', # uisetup will register new commands
88 'extensions',
85 'extensions',
89 ]
86 ]
90
87
91 # sensitive environment variables affecting confighash
88 # sensitive environment variables affecting confighash
92 _envre = re.compile(r'''\A(?:
89 _envre = re.compile(r'''\A(?:
93 CHGHG
90 CHGHG
94 |HG.*
91 |HG.*
95 |LANG(?:UAGE)?
92 |LANG(?:UAGE)?
96 |LC_.*
93 |LC_.*
97 |LD_.*
94 |LD_.*
98 |PATH
95 |PATH
99 |PYTHON.*
96 |PYTHON.*
100 |TERM(?:INFO)?
97 |TERM(?:INFO)?
101 |TZ
98 |TZ
102 )\Z''', re.X)
99 )\Z''', re.X)
103
100
104 def _confighash(ui):
101 def _confighash(ui):
105 """return a quick hash for detecting config/env changes
102 """return a quick hash for detecting config/env changes
106
103
107 confighash is the hash of sensitive config items and environment variables.
104 confighash is the hash of sensitive config items and environment variables.
108
105
109 for chgserver, it is designed that once confighash changes, the server is
106 for chgserver, it is designed that once confighash changes, the server is
110 not qualified to serve its client and should redirect the client to a new
107 not qualified to serve its client and should redirect the client to a new
111 server. different from mtimehash, confighash change will not mark the
108 server. different from mtimehash, confighash change will not mark the
112 server outdated and exit since the user can have different configs at the
109 server outdated and exit since the user can have different configs at the
113 same time.
110 same time.
114 """
111 """
115 sectionitems = []
112 sectionitems = []
116 for section in _configsections:
113 for section in _configsections:
117 sectionitems.append(ui.configitems(section))
114 sectionitems.append(ui.configitems(section))
118 sectionhash = _hashlist(sectionitems)
115 sectionhash = _hashlist(sectionitems)
119 envitems = [(k, v) for k, v in os.environ.iteritems() if _envre.match(k)]
116 envitems = [(k, v) for k, v in os.environ.iteritems() if _envre.match(k)]
120 envhash = _hashlist(sorted(envitems))
117 envhash = _hashlist(sorted(envitems))
121 return sectionhash[:6] + envhash[:6]
118 return sectionhash[:6] + envhash[:6]
122
119
123 def _getmtimepaths(ui):
120 def _getmtimepaths(ui):
124 """get a list of paths that should be checked to detect change
121 """get a list of paths that should be checked to detect change
125
122
126 The list will include:
123 The list will include:
127 - extensions (will not cover all files for complex extensions)
124 - extensions (will not cover all files for complex extensions)
128 - mercurial/__version__.py
125 - mercurial/__version__.py
129 - python binary
126 - python binary
130 """
127 """
131 modules = [m for n, m in extensions.extensions(ui)]
128 modules = [m for n, m in extensions.extensions(ui)]
132 try:
129 try:
133 from mercurial import __version__
130 from mercurial import __version__
134 modules.append(__version__)
131 modules.append(__version__)
135 except ImportError:
132 except ImportError:
136 pass
133 pass
137 files = [sys.executable]
134 files = [sys.executable]
138 for m in modules:
135 for m in modules:
139 try:
136 try:
140 files.append(inspect.getabsfile(m))
137 files.append(inspect.getabsfile(m))
141 except TypeError:
138 except TypeError:
142 pass
139 pass
143 return sorted(set(files))
140 return sorted(set(files))
144
141
145 def _mtimehash(paths):
142 def _mtimehash(paths):
146 """return a quick hash for detecting file changes
143 """return a quick hash for detecting file changes
147
144
148 mtimehash calls stat on given paths and calculate a hash based on size and
145 mtimehash calls stat on given paths and calculate a hash based on size and
149 mtime of each file. mtimehash does not read file content because reading is
146 mtime of each file. mtimehash does not read file content because reading is
150 expensive. therefore it's not 100% reliable for detecting content changes.
147 expensive. therefore it's not 100% reliable for detecting content changes.
151 it's possible to return different hashes for same file contents.
148 it's possible to return different hashes for same file contents.
152 it's also possible to return a same hash for different file contents for
149 it's also possible to return a same hash for different file contents for
153 some carefully crafted situation.
150 some carefully crafted situation.
154
151
155 for chgserver, it is designed that once mtimehash changes, the server is
152 for chgserver, it is designed that once mtimehash changes, the server is
156 considered outdated immediately and should no longer provide service.
153 considered outdated immediately and should no longer provide service.
157
154
158 mtimehash is not included in confighash because we only know the paths of
155 mtimehash is not included in confighash because we only know the paths of
159 extensions after importing them (there is imp.find_module but that faces
156 extensions after importing them (there is imp.find_module but that faces
160 race conditions). We need to calculate confighash without importing.
157 race conditions). We need to calculate confighash without importing.
161 """
158 """
162 def trystat(path):
159 def trystat(path):
163 try:
160 try:
164 st = os.stat(path)
161 st = os.stat(path)
165 return (st.st_mtime, st.st_size)
162 return (st.st_mtime, st.st_size)
166 except OSError:
163 except OSError:
167 # could be ENOENT, EPERM etc. not fatal in any case
164 # could be ENOENT, EPERM etc. not fatal in any case
168 pass
165 pass
169 return _hashlist(map(trystat, paths))[:12]
166 return _hashlist(map(trystat, paths))[:12]
170
167
171 class hashstate(object):
168 class hashstate(object):
172 """a structure storing confighash, mtimehash, paths used for mtimehash"""
169 """a structure storing confighash, mtimehash, paths used for mtimehash"""
173 def __init__(self, confighash, mtimehash, mtimepaths):
170 def __init__(self, confighash, mtimehash, mtimepaths):
174 self.confighash = confighash
171 self.confighash = confighash
175 self.mtimehash = mtimehash
172 self.mtimehash = mtimehash
176 self.mtimepaths = mtimepaths
173 self.mtimepaths = mtimepaths
177
174
178 @staticmethod
175 @staticmethod
179 def fromui(ui, mtimepaths=None):
176 def fromui(ui, mtimepaths=None):
180 if mtimepaths is None:
177 if mtimepaths is None:
181 mtimepaths = _getmtimepaths(ui)
178 mtimepaths = _getmtimepaths(ui)
182 confighash = _confighash(ui)
179 confighash = _confighash(ui)
183 mtimehash = _mtimehash(mtimepaths)
180 mtimehash = _mtimehash(mtimepaths)
184 _log('confighash = %s mtimehash = %s\n' % (confighash, mtimehash))
181 _log('confighash = %s mtimehash = %s\n' % (confighash, mtimehash))
185 return hashstate(confighash, mtimehash, mtimepaths)
182 return hashstate(confighash, mtimehash, mtimepaths)
186
183
187 # copied from hgext/pager.py:uisetup()
184 # copied from hgext/pager.py:uisetup()
188 def _setuppagercmd(ui, options, cmd):
185 def _setuppagercmd(ui, options, cmd):
189 if not ui.formatted():
186 if not ui.formatted():
190 return
187 return
191
188
192 p = ui.config("pager", "pager", os.environ.get("PAGER"))
189 p = ui.config("pager", "pager", os.environ.get("PAGER"))
193 usepager = False
190 usepager = False
194 always = util.parsebool(options['pager'])
191 always = util.parsebool(options['pager'])
195 auto = options['pager'] == 'auto'
192 auto = options['pager'] == 'auto'
196
193
197 if not p:
194 if not p:
198 pass
195 pass
199 elif always:
196 elif always:
200 usepager = True
197 usepager = True
201 elif not auto:
198 elif not auto:
202 usepager = False
199 usepager = False
203 else:
200 else:
204 attended = ['annotate', 'cat', 'diff', 'export', 'glog', 'log', 'qdiff']
201 attended = ['annotate', 'cat', 'diff', 'export', 'glog', 'log', 'qdiff']
205 attend = ui.configlist('pager', 'attend', attended)
202 attend = ui.configlist('pager', 'attend', attended)
206 ignore = ui.configlist('pager', 'ignore')
203 ignore = ui.configlist('pager', 'ignore')
207 cmds, _ = cmdutil.findcmd(cmd, commands.table)
204 cmds, _ = cmdutil.findcmd(cmd, commands.table)
208
205
209 for cmd in cmds:
206 for cmd in cmds:
210 var = 'attend-%s' % cmd
207 var = 'attend-%s' % cmd
211 if ui.config('pager', var):
208 if ui.config('pager', var):
212 usepager = ui.configbool('pager', var)
209 usepager = ui.configbool('pager', var)
213 break
210 break
214 if (cmd in attend or
211 if (cmd in attend or
215 (cmd not in ignore and not attend)):
212 (cmd not in ignore and not attend)):
216 usepager = True
213 usepager = True
217 break
214 break
218
215
219 if usepager:
216 if usepager:
220 ui.setconfig('ui', 'formatted', ui.formatted(), 'pager')
217 ui.setconfig('ui', 'formatted', ui.formatted(), 'pager')
221 ui.setconfig('ui', 'interactive', False, 'pager')
218 ui.setconfig('ui', 'interactive', False, 'pager')
222 return p
219 return p
223
220
224 def _newchgui(srcui, csystem):
221 def _newchgui(srcui, csystem):
225 class chgui(srcui.__class__):
222 class chgui(srcui.__class__):
226 def __init__(self, src=None):
223 def __init__(self, src=None):
227 super(chgui, self).__init__(src)
224 super(chgui, self).__init__(src)
228 if src:
225 if src:
229 self._csystem = getattr(src, '_csystem', csystem)
226 self._csystem = getattr(src, '_csystem', csystem)
230 else:
227 else:
231 self._csystem = csystem
228 self._csystem = csystem
232
229
233 def system(self, cmd, environ=None, cwd=None, onerr=None,
230 def system(self, cmd, environ=None, cwd=None, onerr=None,
234 errprefix=None):
231 errprefix=None):
235 # fallback to the original system method if the output needs to be
232 # fallback to the original system method if the output needs to be
236 # captured (to self._buffers), or the output stream is not stdout
233 # captured (to self._buffers), or the output stream is not stdout
237 # (e.g. stderr, cStringIO), because the chg client is not aware of
234 # (e.g. stderr, cStringIO), because the chg client is not aware of
238 # these situations and will behave differently (write to stdout).
235 # these situations and will behave differently (write to stdout).
239 if (any(s[1] for s in self._bufferstates)
236 if (any(s[1] for s in self._bufferstates)
240 or not util.safehasattr(self.fout, 'fileno')
237 or not util.safehasattr(self.fout, 'fileno')
241 or self.fout.fileno() != sys.stdout.fileno()):
238 or self.fout.fileno() != sys.stdout.fileno()):
242 return super(chgui, self).system(cmd, environ, cwd, onerr,
239 return super(chgui, self).system(cmd, environ, cwd, onerr,
243 errprefix)
240 errprefix)
244 # copied from mercurial/util.py:system()
241 # copied from mercurial/util.py:system()
245 self.flush()
242 self.flush()
246 def py2shell(val):
243 def py2shell(val):
247 if val is None or val is False:
244 if val is None or val is False:
248 return '0'
245 return '0'
249 if val is True:
246 if val is True:
250 return '1'
247 return '1'
251 return str(val)
248 return str(val)
252 env = os.environ.copy()
249 env = os.environ.copy()
253 if environ:
250 if environ:
254 env.update((k, py2shell(v)) for k, v in environ.iteritems())
251 env.update((k, py2shell(v)) for k, v in environ.iteritems())
255 env['HG'] = util.hgexecutable()
252 env['HG'] = util.hgexecutable()
256 rc = self._csystem(cmd, env, cwd)
253 rc = self._csystem(cmd, env, cwd)
257 if rc and onerr:
254 if rc and onerr:
258 errmsg = '%s %s' % (os.path.basename(cmd.split(None, 1)[0]),
255 errmsg = '%s %s' % (os.path.basename(cmd.split(None, 1)[0]),
259 util.explainexit(rc)[0])
256 util.explainexit(rc)[0])
260 if errprefix:
257 if errprefix:
261 errmsg = '%s: %s' % (errprefix, errmsg)
258 errmsg = '%s: %s' % (errprefix, errmsg)
262 raise onerr(errmsg)
259 raise onerr(errmsg)
263 return rc
260 return rc
264
261
265 return chgui(srcui)
262 return chgui(srcui)
266
263
267 def _loadnewui(srcui, args):
264 def _loadnewui(srcui, args):
268 newui = srcui.__class__()
265 newui = srcui.__class__()
269 for a in ['fin', 'fout', 'ferr', 'environ']:
266 for a in ['fin', 'fout', 'ferr', 'environ']:
270 setattr(newui, a, getattr(srcui, a))
267 setattr(newui, a, getattr(srcui, a))
271 if util.safehasattr(srcui, '_csystem'):
268 if util.safehasattr(srcui, '_csystem'):
272 newui._csystem = srcui._csystem
269 newui._csystem = srcui._csystem
273
270
274 # internal config: extensions.chgserver
271 # internal config: extensions.chgserver
275 newui.setconfig('extensions', 'chgserver',
272 newui.setconfig('extensions', 'chgserver',
276 srcui.config('extensions', 'chgserver'), '--config')
273 srcui.config('extensions', 'chgserver'), '--config')
277
274
278 # command line args
275 # command line args
279 args = args[:]
276 args = args[:]
280 dispatch._parseconfig(newui, dispatch._earlygetopt(['--config'], args))
277 dispatch._parseconfig(newui, dispatch._earlygetopt(['--config'], args))
281
278
282 # stolen from tortoisehg.util.copydynamicconfig()
279 # stolen from tortoisehg.util.copydynamicconfig()
283 for section, name, value in srcui.walkconfig():
280 for section, name, value in srcui.walkconfig():
284 source = srcui.configsource(section, name)
281 source = srcui.configsource(section, name)
285 if ':' in source or source == '--config':
282 if ':' in source or source == '--config':
286 # path:line or command line
283 # path:line or command line
287 continue
284 continue
288 if source == 'none':
285 if source == 'none':
289 # ui.configsource returns 'none' by default
286 # ui.configsource returns 'none' by default
290 source = ''
287 source = ''
291 newui.setconfig(section, name, value, source)
288 newui.setconfig(section, name, value, source)
292
289
293 # load wd and repo config, copied from dispatch.py
290 # load wd and repo config, copied from dispatch.py
294 cwds = dispatch._earlygetopt(['--cwd'], args)
291 cwds = dispatch._earlygetopt(['--cwd'], args)
295 cwd = cwds and os.path.realpath(cwds[-1]) or None
292 cwd = cwds and os.path.realpath(cwds[-1]) or None
296 rpath = dispatch._earlygetopt(["-R", "--repository", "--repo"], args)
293 rpath = dispatch._earlygetopt(["-R", "--repository", "--repo"], args)
297 path, newlui = dispatch._getlocal(newui, rpath, wd=cwd)
294 path, newlui = dispatch._getlocal(newui, rpath, wd=cwd)
298
295
299 return (newui, newlui)
296 return (newui, newlui)
300
297
301 class channeledsystem(object):
298 class channeledsystem(object):
302 """Propagate ui.system() request in the following format:
299 """Propagate ui.system() request in the following format:
303
300
304 payload length (unsigned int),
301 payload length (unsigned int),
305 cmd, '\0',
302 cmd, '\0',
306 cwd, '\0',
303 cwd, '\0',
307 envkey, '=', val, '\0',
304 envkey, '=', val, '\0',
308 ...
305 ...
309 envkey, '=', val
306 envkey, '=', val
310
307
311 and waits:
308 and waits:
312
309
313 exitcode length (unsigned int),
310 exitcode length (unsigned int),
314 exitcode (int)
311 exitcode (int)
315 """
312 """
316 def __init__(self, in_, out, channel):
313 def __init__(self, in_, out, channel):
317 self.in_ = in_
314 self.in_ = in_
318 self.out = out
315 self.out = out
319 self.channel = channel
316 self.channel = channel
320
317
321 def __call__(self, cmd, environ, cwd):
318 def __call__(self, cmd, environ, cwd):
322 args = [util.quotecommand(cmd), os.path.abspath(cwd or '.')]
319 args = [util.quotecommand(cmd), os.path.abspath(cwd or '.')]
323 args.extend('%s=%s' % (k, v) for k, v in environ.iteritems())
320 args.extend('%s=%s' % (k, v) for k, v in environ.iteritems())
324 data = '\0'.join(args)
321 data = '\0'.join(args)
325 self.out.write(struct.pack('>cI', self.channel, len(data)))
322 self.out.write(struct.pack('>cI', self.channel, len(data)))
326 self.out.write(data)
323 self.out.write(data)
327 self.out.flush()
324 self.out.flush()
328
325
329 length = self.in_.read(4)
326 length = self.in_.read(4)
330 length, = struct.unpack('>I', length)
327 length, = struct.unpack('>I', length)
331 if length != 4:
328 if length != 4:
332 raise error.Abort(_('invalid response'))
329 raise error.Abort(_('invalid response'))
333 rc, = struct.unpack('>i', self.in_.read(4))
330 rc, = struct.unpack('>i', self.in_.read(4))
334 return rc
331 return rc
335
332
336 _iochannels = [
333 _iochannels = [
337 # server.ch, ui.fp, mode
334 # server.ch, ui.fp, mode
338 ('cin', 'fin', 'rb'),
335 ('cin', 'fin', 'rb'),
339 ('cout', 'fout', 'wb'),
336 ('cout', 'fout', 'wb'),
340 ('cerr', 'ferr', 'wb'),
337 ('cerr', 'ferr', 'wb'),
341 ]
338 ]
342
339
343 class chgcmdserver(commandserver.server):
340 class chgcmdserver(commandserver.server):
344 def __init__(self, ui, repo, fin, fout, sock, hashstate, baseaddress):
341 def __init__(self, ui, repo, fin, fout, sock, hashstate, baseaddress):
345 super(chgcmdserver, self).__init__(
342 super(chgcmdserver, self).__init__(
346 _newchgui(ui, channeledsystem(fin, fout, 'S')), repo, fin, fout)
343 _newchgui(ui, channeledsystem(fin, fout, 'S')), repo, fin, fout)
347 self.clientsock = sock
344 self.clientsock = sock
348 self._oldios = [] # original (self.ch, ui.fp, fd) before "attachio"
345 self._oldios = [] # original (self.ch, ui.fp, fd) before "attachio"
349 self.hashstate = hashstate
346 self.hashstate = hashstate
350 self.baseaddress = baseaddress
347 self.baseaddress = baseaddress
351 if hashstate is not None:
348 if hashstate is not None:
352 self.capabilities = self.capabilities.copy()
349 self.capabilities = self.capabilities.copy()
353 self.capabilities['validate'] = chgcmdserver.validate
350 self.capabilities['validate'] = chgcmdserver.validate
354
351
355 def cleanup(self):
352 def cleanup(self):
356 super(chgcmdserver, self).cleanup()
353 super(chgcmdserver, self).cleanup()
357 # dispatch._runcatch() does not flush outputs if exception is not
354 # dispatch._runcatch() does not flush outputs if exception is not
358 # handled by dispatch._dispatch()
355 # handled by dispatch._dispatch()
359 self.ui.flush()
356 self.ui.flush()
360 self._restoreio()
357 self._restoreio()
361
358
362 def attachio(self):
359 def attachio(self):
363 """Attach to client's stdio passed via unix domain socket; all
360 """Attach to client's stdio passed via unix domain socket; all
364 channels except cresult will no longer be used
361 channels except cresult will no longer be used
365 """
362 """
366 # tell client to sendmsg() with 1-byte payload, which makes it
363 # tell client to sendmsg() with 1-byte payload, which makes it
367 # distinctive from "attachio\n" command consumed by client.read()
364 # distinctive from "attachio\n" command consumed by client.read()
368 self.clientsock.sendall(struct.pack('>cI', 'I', 1))
365 self.clientsock.sendall(struct.pack('>cI', 'I', 1))
369 clientfds = osutil.recvfds(self.clientsock.fileno())
366 clientfds = osutil.recvfds(self.clientsock.fileno())
370 _log('received fds: %r\n' % clientfds)
367 _log('received fds: %r\n' % clientfds)
371
368
372 ui = self.ui
369 ui = self.ui
373 ui.flush()
370 ui.flush()
374 first = self._saveio()
371 first = self._saveio()
375 for fd, (cn, fn, mode) in zip(clientfds, _iochannels):
372 for fd, (cn, fn, mode) in zip(clientfds, _iochannels):
376 assert fd > 0
373 assert fd > 0
377 fp = getattr(ui, fn)
374 fp = getattr(ui, fn)
378 os.dup2(fd, fp.fileno())
375 os.dup2(fd, fp.fileno())
379 os.close(fd)
376 os.close(fd)
380 if not first:
377 if not first:
381 continue
378 continue
382 # reset buffering mode when client is first attached. as we want
379 # reset buffering mode when client is first attached. as we want
383 # to see output immediately on pager, the mode stays unchanged
380 # to see output immediately on pager, the mode stays unchanged
384 # when client re-attached. ferr is unchanged because it should
381 # when client re-attached. ferr is unchanged because it should
385 # be unbuffered no matter if it is a tty or not.
382 # be unbuffered no matter if it is a tty or not.
386 if fn == 'ferr':
383 if fn == 'ferr':
387 newfp = fp
384 newfp = fp
388 else:
385 else:
389 # make it line buffered explicitly because the default is
386 # make it line buffered explicitly because the default is
390 # decided on first write(), where fout could be a pager.
387 # decided on first write(), where fout could be a pager.
391 if fp.isatty():
388 if fp.isatty():
392 bufsize = 1 # line buffered
389 bufsize = 1 # line buffered
393 else:
390 else:
394 bufsize = -1 # system default
391 bufsize = -1 # system default
395 newfp = os.fdopen(fp.fileno(), mode, bufsize)
392 newfp = os.fdopen(fp.fileno(), mode, bufsize)
396 setattr(ui, fn, newfp)
393 setattr(ui, fn, newfp)
397 setattr(self, cn, newfp)
394 setattr(self, cn, newfp)
398
395
399 self.cresult.write(struct.pack('>i', len(clientfds)))
396 self.cresult.write(struct.pack('>i', len(clientfds)))
400
397
401 def _saveio(self):
398 def _saveio(self):
402 if self._oldios:
399 if self._oldios:
403 return False
400 return False
404 ui = self.ui
401 ui = self.ui
405 for cn, fn, _mode in _iochannels:
402 for cn, fn, _mode in _iochannels:
406 ch = getattr(self, cn)
403 ch = getattr(self, cn)
407 fp = getattr(ui, fn)
404 fp = getattr(ui, fn)
408 fd = os.dup(fp.fileno())
405 fd = os.dup(fp.fileno())
409 self._oldios.append((ch, fp, fd))
406 self._oldios.append((ch, fp, fd))
410 return True
407 return True
411
408
412 def _restoreio(self):
409 def _restoreio(self):
413 ui = self.ui
410 ui = self.ui
414 for (ch, fp, fd), (cn, fn, _mode) in zip(self._oldios, _iochannels):
411 for (ch, fp, fd), (cn, fn, _mode) in zip(self._oldios, _iochannels):
415 newfp = getattr(ui, fn)
412 newfp = getattr(ui, fn)
416 # close newfp while it's associated with client; otherwise it
413 # close newfp while it's associated with client; otherwise it
417 # would be closed when newfp is deleted
414 # would be closed when newfp is deleted
418 if newfp is not fp:
415 if newfp is not fp:
419 newfp.close()
416 newfp.close()
420 # restore original fd: fp is open again
417 # restore original fd: fp is open again
421 os.dup2(fd, fp.fileno())
418 os.dup2(fd, fp.fileno())
422 os.close(fd)
419 os.close(fd)
423 setattr(self, cn, ch)
420 setattr(self, cn, ch)
424 setattr(ui, fn, fp)
421 setattr(ui, fn, fp)
425 del self._oldios[:]
422 del self._oldios[:]
426
423
427 def validate(self):
424 def validate(self):
428 """Reload the config and check if the server is up to date
425 """Reload the config and check if the server is up to date
429
426
430 Read a list of '\0' separated arguments.
427 Read a list of '\0' separated arguments.
431 Write a non-empty list of '\0' separated instruction strings or '\0'
428 Write a non-empty list of '\0' separated instruction strings or '\0'
432 if the list is empty.
429 if the list is empty.
433 An instruction string could be either:
430 An instruction string could be either:
434 - "unlink $path", the client should unlink the path to stop the
431 - "unlink $path", the client should unlink the path to stop the
435 outdated server.
432 outdated server.
436 - "redirect $path", the client should attempt to connect to $path
433 - "redirect $path", the client should attempt to connect to $path
437 first. If it does not work, start a new server. It implies
434 first. If it does not work, start a new server. It implies
438 "reconnect".
435 "reconnect".
439 - "exit $n", the client should exit directly with code n.
436 - "exit $n", the client should exit directly with code n.
440 This may happen if we cannot parse the config.
437 This may happen if we cannot parse the config.
441 - "reconnect", the client should close the connection and
438 - "reconnect", the client should close the connection and
442 reconnect.
439 reconnect.
443 If neither "reconnect" nor "redirect" is included in the instruction
440 If neither "reconnect" nor "redirect" is included in the instruction
444 list, the client can continue with this server after completing all
441 list, the client can continue with this server after completing all
445 the instructions.
442 the instructions.
446 """
443 """
447 args = self._readlist()
444 args = self._readlist()
448 try:
445 try:
449 self.ui, lui = _loadnewui(self.ui, args)
446 self.ui, lui = _loadnewui(self.ui, args)
450 except error.ParseError as inst:
447 except error.ParseError as inst:
451 dispatch._formatparse(self.ui.warn, inst)
448 dispatch._formatparse(self.ui.warn, inst)
452 self.ui.flush()
449 self.ui.flush()
453 self.cresult.write('exit 255')
450 self.cresult.write('exit 255')
454 return
451 return
455 newhash = hashstate.fromui(lui, self.hashstate.mtimepaths)
452 newhash = hashstate.fromui(lui, self.hashstate.mtimepaths)
456 insts = []
453 insts = []
457 if newhash.mtimehash != self.hashstate.mtimehash:
454 if newhash.mtimehash != self.hashstate.mtimehash:
458 addr = _hashaddress(self.baseaddress, self.hashstate.confighash)
455 addr = _hashaddress(self.baseaddress, self.hashstate.confighash)
459 insts.append('unlink %s' % addr)
456 insts.append('unlink %s' % addr)
460 # mtimehash is empty if one or more extensions fail to load.
457 # mtimehash is empty if one or more extensions fail to load.
461 # to be compatible with hg, still serve the client this time.
458 # to be compatible with hg, still serve the client this time.
462 if self.hashstate.mtimehash:
459 if self.hashstate.mtimehash:
463 insts.append('reconnect')
460 insts.append('reconnect')
464 if newhash.confighash != self.hashstate.confighash:
461 if newhash.confighash != self.hashstate.confighash:
465 addr = _hashaddress(self.baseaddress, newhash.confighash)
462 addr = _hashaddress(self.baseaddress, newhash.confighash)
466 insts.append('redirect %s' % addr)
463 insts.append('redirect %s' % addr)
467 _log('validate: %s\n' % insts)
464 _log('validate: %s\n' % insts)
468 self.cresult.write('\0'.join(insts) or '\0')
465 self.cresult.write('\0'.join(insts) or '\0')
469
466
470 def chdir(self):
467 def chdir(self):
471 """Change current directory
468 """Change current directory
472
469
473 Note that the behavior of --cwd option is bit different from this.
470 Note that the behavior of --cwd option is bit different from this.
474 It does not affect --config parameter.
471 It does not affect --config parameter.
475 """
472 """
476 path = self._readstr()
473 path = self._readstr()
477 if not path:
474 if not path:
478 return
475 return
479 _log('chdir to %r\n' % path)
476 _log('chdir to %r\n' % path)
480 os.chdir(path)
477 os.chdir(path)
481
478
482 def setumask(self):
479 def setumask(self):
483 """Change umask"""
480 """Change umask"""
484 mask = struct.unpack('>I', self._read(4))[0]
481 mask = struct.unpack('>I', self._read(4))[0]
485 _log('setumask %r\n' % mask)
482 _log('setumask %r\n' % mask)
486 os.umask(mask)
483 os.umask(mask)
487
484
488 def getpager(self):
485 def getpager(self):
489 """Read cmdargs and write pager command to r-channel if enabled
486 """Read cmdargs and write pager command to r-channel if enabled
490
487
491 If pager isn't enabled, this writes '\0' because channeledoutput
488 If pager isn't enabled, this writes '\0' because channeledoutput
492 does not allow to write empty data.
489 does not allow to write empty data.
493 """
490 """
494 args = self._readlist()
491 args = self._readlist()
495 try:
492 try:
496 cmd, _func, args, options, _cmdoptions = dispatch._parse(self.ui,
493 cmd, _func, args, options, _cmdoptions = dispatch._parse(self.ui,
497 args)
494 args)
498 except (error.Abort, error.AmbiguousCommand, error.CommandError,
495 except (error.Abort, error.AmbiguousCommand, error.CommandError,
499 error.UnknownCommand):
496 error.UnknownCommand):
500 cmd = None
497 cmd = None
501 options = {}
498 options = {}
502 if not cmd or 'pager' not in options:
499 if not cmd or 'pager' not in options:
503 self.cresult.write('\0')
500 self.cresult.write('\0')
504 return
501 return
505
502
506 pagercmd = _setuppagercmd(self.ui, options, cmd)
503 pagercmd = _setuppagercmd(self.ui, options, cmd)
507 if pagercmd:
504 if pagercmd:
508 # Python's SIGPIPE is SIG_IGN by default. change to SIG_DFL so
505 # Python's SIGPIPE is SIG_IGN by default. change to SIG_DFL so
509 # we can exit if the pipe to the pager is closed
506 # we can exit if the pipe to the pager is closed
510 if util.safehasattr(signal, 'SIGPIPE') and \
507 if util.safehasattr(signal, 'SIGPIPE') and \
511 signal.getsignal(signal.SIGPIPE) == signal.SIG_IGN:
508 signal.getsignal(signal.SIGPIPE) == signal.SIG_IGN:
512 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
509 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
513 self.cresult.write(pagercmd)
510 self.cresult.write(pagercmd)
514 else:
511 else:
515 self.cresult.write('\0')
512 self.cresult.write('\0')
516
513
517 def setenv(self):
514 def setenv(self):
518 """Clear and update os.environ
515 """Clear and update os.environ
519
516
520 Note that not all variables can make an effect on the running process.
517 Note that not all variables can make an effect on the running process.
521 """
518 """
522 l = self._readlist()
519 l = self._readlist()
523 try:
520 try:
524 newenv = dict(s.split('=', 1) for s in l)
521 newenv = dict(s.split('=', 1) for s in l)
525 except ValueError:
522 except ValueError:
526 raise ValueError('unexpected value in setenv request')
523 raise ValueError('unexpected value in setenv request')
527 _log('setenv: %r\n' % sorted(newenv.keys()))
524 _log('setenv: %r\n' % sorted(newenv.keys()))
528 os.environ.clear()
525 os.environ.clear()
529 os.environ.update(newenv)
526 os.environ.update(newenv)
530
527
531 capabilities = commandserver.server.capabilities.copy()
528 capabilities = commandserver.server.capabilities.copy()
532 capabilities.update({'attachio': attachio,
529 capabilities.update({'attachio': attachio,
533 'chdir': chdir,
530 'chdir': chdir,
534 'getpager': getpager,
531 'getpager': getpager,
535 'setenv': setenv,
532 'setenv': setenv,
536 'setumask': setumask})
533 'setumask': setumask})
537
534
538 # copied from mercurial/commandserver.py
535 class _requesthandler(commandserver._requesthandler):
539 class _requesthandler(socketserver.StreamRequestHandler):
540 def handle(self):
541 # use a different process group from the master process, making this
542 # process pass kernel "is_current_pgrp_orphaned" check so signals like
543 # SIGTSTP, SIGTTIN, SIGTTOU are not ignored.
544 os.setpgid(0, 0)
545 # change random state otherwise forked request handlers would have a
546 # same state inherited from parent.
547 random.seed()
548 ui = self.server.ui
549 sv = None
550 try:
551 sv = self._createcmdserver()
552 try:
553 sv.serve()
554 # handle exceptions that may be raised by command server. most of
555 # known exceptions are caught by dispatch.
556 except error.Abort as inst:
557 ui.warn(_('abort: %s\n') % inst)
558 except IOError as inst:
559 if inst.errno != errno.EPIPE:
560 raise
561 except KeyboardInterrupt:
562 pass
563 finally:
564 sv.cleanup()
565 except: # re-raises
566 # also write traceback to error channel. otherwise client cannot
567 # see it because it is written to server's stderr by default.
568 if sv:
569 cerr = sv.cerr
570 else:
571 cerr = commandserver.channeledoutput(self.wfile, 'e')
572 traceback.print_exc(file=cerr)
573 raise
574 finally:
575 # trigger __del__ since ForkingMixIn uses os._exit
576 gc.collect()
577
578 def _createcmdserver(self):
536 def _createcmdserver(self):
579 ui = self.server.ui
537 ui = self.server.ui
580 repo = self.server.repo
538 repo = self.server.repo
581 return chgcmdserver(ui, repo, self.rfile, self.wfile, self.connection,
539 return chgcmdserver(ui, repo, self.rfile, self.wfile, self.connection,
582 self.server.hashstate, self.server.baseaddress)
540 self.server.hashstate, self.server.baseaddress)
583
541
584 def _tempaddress(address):
542 def _tempaddress(address):
585 return '%s.%d.tmp' % (address, os.getpid())
543 return '%s.%d.tmp' % (address, os.getpid())
586
544
587 def _hashaddress(address, hashstr):
545 def _hashaddress(address, hashstr):
588 return '%s-%s' % (address, hashstr)
546 return '%s-%s' % (address, hashstr)
589
547
590 class AutoExitMixIn: # use old-style to comply with SocketServer design
548 class AutoExitMixIn: # use old-style to comply with SocketServer design
591 lastactive = time.time()
549 lastactive = time.time()
592 idletimeout = 3600 # default 1 hour
550 idletimeout = 3600 # default 1 hour
593
551
594 def startautoexitthread(self):
552 def startautoexitthread(self):
595 # note: the auto-exit check here is cheap enough to not use a thread,
553 # note: the auto-exit check here is cheap enough to not use a thread,
596 # be done in serve_forever. however SocketServer is hook-unfriendly,
554 # be done in serve_forever. however SocketServer is hook-unfriendly,
597 # you simply cannot hook serve_forever without copying a lot of code.
555 # you simply cannot hook serve_forever without copying a lot of code.
598 # besides, serve_forever's docstring suggests using thread.
556 # besides, serve_forever's docstring suggests using thread.
599 thread = threading.Thread(target=self._autoexitloop)
557 thread = threading.Thread(target=self._autoexitloop)
600 thread.daemon = True
558 thread.daemon = True
601 thread.start()
559 thread.start()
602
560
603 def _autoexitloop(self, interval=1):
561 def _autoexitloop(self, interval=1):
604 while True:
562 while True:
605 time.sleep(interval)
563 time.sleep(interval)
606 if not self.issocketowner():
564 if not self.issocketowner():
607 _log('%s is not owned, exiting.\n' % self.server_address)
565 _log('%s is not owned, exiting.\n' % self.server_address)
608 break
566 break
609 if time.time() - self.lastactive > self.idletimeout:
567 if time.time() - self.lastactive > self.idletimeout:
610 _log('being idle too long. exiting.\n')
568 _log('being idle too long. exiting.\n')
611 break
569 break
612 self.shutdown()
570 self.shutdown()
613
571
614 def process_request(self, request, address):
572 def process_request(self, request, address):
615 self.lastactive = time.time()
573 self.lastactive = time.time()
616 return socketserver.ForkingMixIn.process_request(
574 return socketserver.ForkingMixIn.process_request(
617 self, request, address)
575 self, request, address)
618
576
619 def server_bind(self):
577 def server_bind(self):
620 # use a unique temp address so we can stat the file and do ownership
578 # use a unique temp address so we can stat the file and do ownership
621 # check later
579 # check later
622 tempaddress = _tempaddress(self.server_address)
580 tempaddress = _tempaddress(self.server_address)
623 # use relative path instead of full path at bind() if possible, since
581 # use relative path instead of full path at bind() if possible, since
624 # AF_UNIX path has very small length limit (107 chars) on common
582 # AF_UNIX path has very small length limit (107 chars) on common
625 # platforms (see sys/un.h)
583 # platforms (see sys/un.h)
626 dirname, basename = os.path.split(tempaddress)
584 dirname, basename = os.path.split(tempaddress)
627 bakwdfd = None
585 bakwdfd = None
628 if dirname:
586 if dirname:
629 bakwdfd = os.open('.', os.O_DIRECTORY)
587 bakwdfd = os.open('.', os.O_DIRECTORY)
630 os.chdir(dirname)
588 os.chdir(dirname)
631 self.socket.bind(basename)
589 self.socket.bind(basename)
632 self._socketstat = os.stat(basename)
590 self._socketstat = os.stat(basename)
633 # rename will replace the old socket file if exists atomically. the
591 # rename will replace the old socket file if exists atomically. the
634 # old server will detect ownership change and exit.
592 # old server will detect ownership change and exit.
635 util.rename(basename, self.server_address)
593 util.rename(basename, self.server_address)
636 if bakwdfd:
594 if bakwdfd:
637 os.fchdir(bakwdfd)
595 os.fchdir(bakwdfd)
638 os.close(bakwdfd)
596 os.close(bakwdfd)
639
597
640 def issocketowner(self):
598 def issocketowner(self):
641 try:
599 try:
642 stat = os.stat(self.server_address)
600 stat = os.stat(self.server_address)
643 return (stat.st_ino == self._socketstat.st_ino and
601 return (stat.st_ino == self._socketstat.st_ino and
644 stat.st_mtime == self._socketstat.st_mtime)
602 stat.st_mtime == self._socketstat.st_mtime)
645 except OSError:
603 except OSError:
646 return False
604 return False
647
605
648 def unlinksocketfile(self):
606 def unlinksocketfile(self):
649 if not self.issocketowner():
607 if not self.issocketowner():
650 return
608 return
651 # it is possible to have a race condition here that we may
609 # it is possible to have a race condition here that we may
652 # remove another server's socket file. but that's okay
610 # remove another server's socket file. but that's okay
653 # since that server will detect and exit automatically and
611 # since that server will detect and exit automatically and
654 # the client will start a new server on demand.
612 # the client will start a new server on demand.
655 try:
613 try:
656 os.unlink(self.server_address)
614 os.unlink(self.server_address)
657 except OSError as exc:
615 except OSError as exc:
658 if exc.errno != errno.ENOENT:
616 if exc.errno != errno.ENOENT:
659 raise
617 raise
660
618
661 class chgunixservice(commandserver.unixservice):
619 class chgunixservice(commandserver.unixservice):
662 def init(self):
620 def init(self):
663 if self.repo:
621 if self.repo:
664 # one chgserver can serve multiple repos. drop repo infomation
622 # one chgserver can serve multiple repos. drop repo infomation
665 self.ui.setconfig('bundle', 'mainreporoot', '', 'repo')
623 self.ui.setconfig('bundle', 'mainreporoot', '', 'repo')
666 self.repo = None
624 self.repo = None
667 self._inithashstate()
625 self._inithashstate()
668 self._checkextensions()
626 self._checkextensions()
669 class cls(AutoExitMixIn, socketserver.ForkingMixIn,
627 class cls(AutoExitMixIn, socketserver.ForkingMixIn,
670 socketserver.UnixStreamServer):
628 socketserver.UnixStreamServer):
671 ui = self.ui
629 ui = self.ui
672 repo = self.repo
630 repo = self.repo
673 hashstate = self.hashstate
631 hashstate = self.hashstate
674 baseaddress = self.baseaddress
632 baseaddress = self.baseaddress
675 self.server = cls(self.address, _requesthandler)
633 self.server = cls(self.address, _requesthandler)
676 self.server.idletimeout = self.ui.configint(
634 self.server.idletimeout = self.ui.configint(
677 'chgserver', 'idletimeout', self.server.idletimeout)
635 'chgserver', 'idletimeout', self.server.idletimeout)
678 self.server.startautoexitthread()
636 self.server.startautoexitthread()
679 self._createsymlink()
637 self._createsymlink()
680
638
681 def _inithashstate(self):
639 def _inithashstate(self):
682 self.baseaddress = self.address
640 self.baseaddress = self.address
683 if self.ui.configbool('chgserver', 'skiphash', False):
641 if self.ui.configbool('chgserver', 'skiphash', False):
684 self.hashstate = None
642 self.hashstate = None
685 return
643 return
686 self.hashstate = hashstate.fromui(self.ui)
644 self.hashstate = hashstate.fromui(self.ui)
687 self.address = _hashaddress(self.address, self.hashstate.confighash)
645 self.address = _hashaddress(self.address, self.hashstate.confighash)
688
646
689 def _checkextensions(self):
647 def _checkextensions(self):
690 if not self.hashstate:
648 if not self.hashstate:
691 return
649 return
692 if extensions.notloaded():
650 if extensions.notloaded():
693 # one or more extensions failed to load. mtimehash becomes
651 # one or more extensions failed to load. mtimehash becomes
694 # meaningless because we do not know the paths of those extensions.
652 # meaningless because we do not know the paths of those extensions.
695 # set mtimehash to an illegal hash value to invalidate the server.
653 # set mtimehash to an illegal hash value to invalidate the server.
696 self.hashstate.mtimehash = ''
654 self.hashstate.mtimehash = ''
697
655
698 def _createsymlink(self):
656 def _createsymlink(self):
699 if self.baseaddress == self.address:
657 if self.baseaddress == self.address:
700 return
658 return
701 tempaddress = _tempaddress(self.baseaddress)
659 tempaddress = _tempaddress(self.baseaddress)
702 os.symlink(os.path.basename(self.address), tempaddress)
660 os.symlink(os.path.basename(self.address), tempaddress)
703 util.rename(tempaddress, self.baseaddress)
661 util.rename(tempaddress, self.baseaddress)
704
662
705 def run(self):
663 def run(self):
706 try:
664 try:
707 self.server.serve_forever()
665 self.server.serve_forever()
708 finally:
666 finally:
709 self.server.unlinksocketfile()
667 self.server.unlinksocketfile()
710
668
711 def uisetup(ui):
669 def uisetup(ui):
712 commandserver._servicemap['chgunix'] = chgunixservice
670 commandserver._servicemap['chgunix'] = chgunixservice
713
671
714 # CHGINTERNALMARK is temporarily set by chg client to detect if chg will
672 # CHGINTERNALMARK is temporarily set by chg client to detect if chg will
715 # start another chg. drop it to avoid possible side effects.
673 # start another chg. drop it to avoid possible side effects.
716 if 'CHGINTERNALMARK' in os.environ:
674 if 'CHGINTERNALMARK' in os.environ:
717 del os.environ['CHGINTERNALMARK']
675 del os.environ['CHGINTERNALMARK']
@@ -1,410 +1,422 b''
1 # commandserver.py - communicate with Mercurial's API over a pipe
1 # commandserver.py - communicate with Mercurial's API over a pipe
2 #
2 #
3 # Copyright Matt Mackall <mpm@selenic.com>
3 # Copyright Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import gc
11 import os
12 import os
13 import random
12 import struct
14 import struct
13 import sys
15 import sys
14 import traceback
16 import traceback
15
17
16 from .i18n import _
18 from .i18n import _
17 from . import (
19 from . import (
18 encoding,
20 encoding,
19 error,
21 error,
20 util,
22 util,
21 )
23 )
22
24
23 socketserver = util.socketserver
25 socketserver = util.socketserver
24
26
25 logfile = None
27 logfile = None
26
28
27 def log(*args):
29 def log(*args):
28 if not logfile:
30 if not logfile:
29 return
31 return
30
32
31 for a in args:
33 for a in args:
32 logfile.write(str(a))
34 logfile.write(str(a))
33
35
34 logfile.flush()
36 logfile.flush()
35
37
36 class channeledoutput(object):
38 class channeledoutput(object):
37 """
39 """
38 Write data to out in the following format:
40 Write data to out in the following format:
39
41
40 data length (unsigned int),
42 data length (unsigned int),
41 data
43 data
42 """
44 """
43 def __init__(self, out, channel):
45 def __init__(self, out, channel):
44 self.out = out
46 self.out = out
45 self.channel = channel
47 self.channel = channel
46
48
47 @property
49 @property
48 def name(self):
50 def name(self):
49 return '<%c-channel>' % self.channel
51 return '<%c-channel>' % self.channel
50
52
51 def write(self, data):
53 def write(self, data):
52 if not data:
54 if not data:
53 return
55 return
54 self.out.write(struct.pack('>cI', self.channel, len(data)))
56 self.out.write(struct.pack('>cI', self.channel, len(data)))
55 self.out.write(data)
57 self.out.write(data)
56 self.out.flush()
58 self.out.flush()
57
59
58 def __getattr__(self, attr):
60 def __getattr__(self, attr):
59 if attr in ('isatty', 'fileno', 'tell', 'seek'):
61 if attr in ('isatty', 'fileno', 'tell', 'seek'):
60 raise AttributeError(attr)
62 raise AttributeError(attr)
61 return getattr(self.out, attr)
63 return getattr(self.out, attr)
62
64
63 class channeledinput(object):
65 class channeledinput(object):
64 """
66 """
65 Read data from in_.
67 Read data from in_.
66
68
67 Requests for input are written to out in the following format:
69 Requests for input are written to out in the following format:
68 channel identifier - 'I' for plain input, 'L' line based (1 byte)
70 channel identifier - 'I' for plain input, 'L' line based (1 byte)
69 how many bytes to send at most (unsigned int),
71 how many bytes to send at most (unsigned int),
70
72
71 The client replies with:
73 The client replies with:
72 data length (unsigned int), 0 meaning EOF
74 data length (unsigned int), 0 meaning EOF
73 data
75 data
74 """
76 """
75
77
76 maxchunksize = 4 * 1024
78 maxchunksize = 4 * 1024
77
79
78 def __init__(self, in_, out, channel):
80 def __init__(self, in_, out, channel):
79 self.in_ = in_
81 self.in_ = in_
80 self.out = out
82 self.out = out
81 self.channel = channel
83 self.channel = channel
82
84
83 @property
85 @property
84 def name(self):
86 def name(self):
85 return '<%c-channel>' % self.channel
87 return '<%c-channel>' % self.channel
86
88
87 def read(self, size=-1):
89 def read(self, size=-1):
88 if size < 0:
90 if size < 0:
89 # if we need to consume all the clients input, ask for 4k chunks
91 # if we need to consume all the clients input, ask for 4k chunks
90 # so the pipe doesn't fill up risking a deadlock
92 # so the pipe doesn't fill up risking a deadlock
91 size = self.maxchunksize
93 size = self.maxchunksize
92 s = self._read(size, self.channel)
94 s = self._read(size, self.channel)
93 buf = s
95 buf = s
94 while s:
96 while s:
95 s = self._read(size, self.channel)
97 s = self._read(size, self.channel)
96 buf += s
98 buf += s
97
99
98 return buf
100 return buf
99 else:
101 else:
100 return self._read(size, self.channel)
102 return self._read(size, self.channel)
101
103
102 def _read(self, size, channel):
104 def _read(self, size, channel):
103 if not size:
105 if not size:
104 return ''
106 return ''
105 assert size > 0
107 assert size > 0
106
108
107 # tell the client we need at most size bytes
109 # tell the client we need at most size bytes
108 self.out.write(struct.pack('>cI', channel, size))
110 self.out.write(struct.pack('>cI', channel, size))
109 self.out.flush()
111 self.out.flush()
110
112
111 length = self.in_.read(4)
113 length = self.in_.read(4)
112 length = struct.unpack('>I', length)[0]
114 length = struct.unpack('>I', length)[0]
113 if not length:
115 if not length:
114 return ''
116 return ''
115 else:
117 else:
116 return self.in_.read(length)
118 return self.in_.read(length)
117
119
118 def readline(self, size=-1):
120 def readline(self, size=-1):
119 if size < 0:
121 if size < 0:
120 size = self.maxchunksize
122 size = self.maxchunksize
121 s = self._read(size, 'L')
123 s = self._read(size, 'L')
122 buf = s
124 buf = s
123 # keep asking for more until there's either no more or
125 # keep asking for more until there's either no more or
124 # we got a full line
126 # we got a full line
125 while s and s[-1] != '\n':
127 while s and s[-1] != '\n':
126 s = self._read(size, 'L')
128 s = self._read(size, 'L')
127 buf += s
129 buf += s
128
130
129 return buf
131 return buf
130 else:
132 else:
131 return self._read(size, 'L')
133 return self._read(size, 'L')
132
134
133 def __iter__(self):
135 def __iter__(self):
134 return self
136 return self
135
137
136 def next(self):
138 def next(self):
137 l = self.readline()
139 l = self.readline()
138 if not l:
140 if not l:
139 raise StopIteration
141 raise StopIteration
140 return l
142 return l
141
143
142 def __getattr__(self, attr):
144 def __getattr__(self, attr):
143 if attr in ('isatty', 'fileno', 'tell', 'seek'):
145 if attr in ('isatty', 'fileno', 'tell', 'seek'):
144 raise AttributeError(attr)
146 raise AttributeError(attr)
145 return getattr(self.in_, attr)
147 return getattr(self.in_, attr)
146
148
147 class server(object):
149 class server(object):
148 """
150 """
149 Listens for commands on fin, runs them and writes the output on a channel
151 Listens for commands on fin, runs them and writes the output on a channel
150 based stream to fout.
152 based stream to fout.
151 """
153 """
152 def __init__(self, ui, repo, fin, fout):
154 def __init__(self, ui, repo, fin, fout):
153 self.cwd = os.getcwd()
155 self.cwd = os.getcwd()
154
156
155 # developer config: cmdserver.log
157 # developer config: cmdserver.log
156 logpath = ui.config("cmdserver", "log", None)
158 logpath = ui.config("cmdserver", "log", None)
157 if logpath:
159 if logpath:
158 global logfile
160 global logfile
159 if logpath == '-':
161 if logpath == '-':
160 # write log on a special 'd' (debug) channel
162 # write log on a special 'd' (debug) channel
161 logfile = channeledoutput(fout, 'd')
163 logfile = channeledoutput(fout, 'd')
162 else:
164 else:
163 logfile = open(logpath, 'a')
165 logfile = open(logpath, 'a')
164
166
165 if repo:
167 if repo:
166 # the ui here is really the repo ui so take its baseui so we don't
168 # the ui here is really the repo ui so take its baseui so we don't
167 # end up with its local configuration
169 # end up with its local configuration
168 self.ui = repo.baseui
170 self.ui = repo.baseui
169 self.repo = repo
171 self.repo = repo
170 self.repoui = repo.ui
172 self.repoui = repo.ui
171 else:
173 else:
172 self.ui = ui
174 self.ui = ui
173 self.repo = self.repoui = None
175 self.repo = self.repoui = None
174
176
175 self.cerr = channeledoutput(fout, 'e')
177 self.cerr = channeledoutput(fout, 'e')
176 self.cout = channeledoutput(fout, 'o')
178 self.cout = channeledoutput(fout, 'o')
177 self.cin = channeledinput(fin, fout, 'I')
179 self.cin = channeledinput(fin, fout, 'I')
178 self.cresult = channeledoutput(fout, 'r')
180 self.cresult = channeledoutput(fout, 'r')
179
181
180 self.client = fin
182 self.client = fin
181
183
182 def cleanup(self):
184 def cleanup(self):
183 """release and restore resources taken during server session"""
185 """release and restore resources taken during server session"""
184 pass
186 pass
185
187
186 def _read(self, size):
188 def _read(self, size):
187 if not size:
189 if not size:
188 return ''
190 return ''
189
191
190 data = self.client.read(size)
192 data = self.client.read(size)
191
193
192 # is the other end closed?
194 # is the other end closed?
193 if not data:
195 if not data:
194 raise EOFError
196 raise EOFError
195
197
196 return data
198 return data
197
199
198 def _readstr(self):
200 def _readstr(self):
199 """read a string from the channel
201 """read a string from the channel
200
202
201 format:
203 format:
202 data length (uint32), data
204 data length (uint32), data
203 """
205 """
204 length = struct.unpack('>I', self._read(4))[0]
206 length = struct.unpack('>I', self._read(4))[0]
205 if not length:
207 if not length:
206 return ''
208 return ''
207 return self._read(length)
209 return self._read(length)
208
210
209 def _readlist(self):
211 def _readlist(self):
210 """read a list of NULL separated strings from the channel"""
212 """read a list of NULL separated strings from the channel"""
211 s = self._readstr()
213 s = self._readstr()
212 if s:
214 if s:
213 return s.split('\0')
215 return s.split('\0')
214 else:
216 else:
215 return []
217 return []
216
218
217 def runcommand(self):
219 def runcommand(self):
218 """ reads a list of \0 terminated arguments, executes
220 """ reads a list of \0 terminated arguments, executes
219 and writes the return code to the result channel """
221 and writes the return code to the result channel """
220 from . import dispatch # avoid cycle
222 from . import dispatch # avoid cycle
221
223
222 args = self._readlist()
224 args = self._readlist()
223
225
224 # copy the uis so changes (e.g. --config or --verbose) don't
226 # copy the uis so changes (e.g. --config or --verbose) don't
225 # persist between requests
227 # persist between requests
226 copiedui = self.ui.copy()
228 copiedui = self.ui.copy()
227 uis = [copiedui]
229 uis = [copiedui]
228 if self.repo:
230 if self.repo:
229 self.repo.baseui = copiedui
231 self.repo.baseui = copiedui
230 # clone ui without using ui.copy because this is protected
232 # clone ui without using ui.copy because this is protected
231 repoui = self.repoui.__class__(self.repoui)
233 repoui = self.repoui.__class__(self.repoui)
232 repoui.copy = copiedui.copy # redo copy protection
234 repoui.copy = copiedui.copy # redo copy protection
233 uis.append(repoui)
235 uis.append(repoui)
234 self.repo.ui = self.repo.dirstate._ui = repoui
236 self.repo.ui = self.repo.dirstate._ui = repoui
235 self.repo.invalidateall()
237 self.repo.invalidateall()
236
238
237 for ui in uis:
239 for ui in uis:
238 ui.resetstate()
240 ui.resetstate()
239 # any kind of interaction must use server channels, but chg may
241 # any kind of interaction must use server channels, but chg may
240 # replace channels by fully functional tty files. so nontty is
242 # replace channels by fully functional tty files. so nontty is
241 # enforced only if cin is a channel.
243 # enforced only if cin is a channel.
242 if not util.safehasattr(self.cin, 'fileno'):
244 if not util.safehasattr(self.cin, 'fileno'):
243 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
245 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
244
246
245 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
247 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
246 self.cout, self.cerr)
248 self.cout, self.cerr)
247
249
248 ret = (dispatch.dispatch(req) or 0) & 255 # might return None
250 ret = (dispatch.dispatch(req) or 0) & 255 # might return None
249
251
250 # restore old cwd
252 # restore old cwd
251 if '--cwd' in args:
253 if '--cwd' in args:
252 os.chdir(self.cwd)
254 os.chdir(self.cwd)
253
255
254 self.cresult.write(struct.pack('>i', int(ret)))
256 self.cresult.write(struct.pack('>i', int(ret)))
255
257
256 def getencoding(self):
258 def getencoding(self):
257 """ writes the current encoding to the result channel """
259 """ writes the current encoding to the result channel """
258 self.cresult.write(encoding.encoding)
260 self.cresult.write(encoding.encoding)
259
261
260 def serveone(self):
262 def serveone(self):
261 cmd = self.client.readline()[:-1]
263 cmd = self.client.readline()[:-1]
262 if cmd:
264 if cmd:
263 handler = self.capabilities.get(cmd)
265 handler = self.capabilities.get(cmd)
264 if handler:
266 if handler:
265 handler(self)
267 handler(self)
266 else:
268 else:
267 # clients are expected to check what commands are supported by
269 # clients are expected to check what commands are supported by
268 # looking at the servers capabilities
270 # looking at the servers capabilities
269 raise error.Abort(_('unknown command %s') % cmd)
271 raise error.Abort(_('unknown command %s') % cmd)
270
272
271 return cmd != ''
273 return cmd != ''
272
274
273 capabilities = {'runcommand' : runcommand,
275 capabilities = {'runcommand' : runcommand,
274 'getencoding' : getencoding}
276 'getencoding' : getencoding}
275
277
276 def serve(self):
278 def serve(self):
277 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
279 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
278 hellomsg += '\n'
280 hellomsg += '\n'
279 hellomsg += 'encoding: ' + encoding.encoding
281 hellomsg += 'encoding: ' + encoding.encoding
280 hellomsg += '\n'
282 hellomsg += '\n'
281 hellomsg += 'pid: %d' % util.getpid()
283 hellomsg += 'pid: %d' % util.getpid()
282
284
283 # write the hello msg in -one- chunk
285 # write the hello msg in -one- chunk
284 self.cout.write(hellomsg)
286 self.cout.write(hellomsg)
285
287
286 try:
288 try:
287 while self.serveone():
289 while self.serveone():
288 pass
290 pass
289 except EOFError:
291 except EOFError:
290 # we'll get here if the client disconnected while we were reading
292 # we'll get here if the client disconnected while we were reading
291 # its request
293 # its request
292 return 1
294 return 1
293
295
294 return 0
296 return 0
295
297
296 def _protectio(ui):
298 def _protectio(ui):
297 """ duplicates streams and redirect original to null if ui uses stdio """
299 """ duplicates streams and redirect original to null if ui uses stdio """
298 ui.flush()
300 ui.flush()
299 newfiles = []
301 newfiles = []
300 nullfd = os.open(os.devnull, os.O_RDWR)
302 nullfd = os.open(os.devnull, os.O_RDWR)
301 for f, sysf, mode in [(ui.fin, sys.stdin, 'rb'),
303 for f, sysf, mode in [(ui.fin, sys.stdin, 'rb'),
302 (ui.fout, sys.stdout, 'wb')]:
304 (ui.fout, sys.stdout, 'wb')]:
303 if f is sysf:
305 if f is sysf:
304 newfd = os.dup(f.fileno())
306 newfd = os.dup(f.fileno())
305 os.dup2(nullfd, f.fileno())
307 os.dup2(nullfd, f.fileno())
306 f = os.fdopen(newfd, mode)
308 f = os.fdopen(newfd, mode)
307 newfiles.append(f)
309 newfiles.append(f)
308 os.close(nullfd)
310 os.close(nullfd)
309 return tuple(newfiles)
311 return tuple(newfiles)
310
312
311 def _restoreio(ui, fin, fout):
313 def _restoreio(ui, fin, fout):
312 """ restores streams from duplicated ones """
314 """ restores streams from duplicated ones """
313 ui.flush()
315 ui.flush()
314 for f, uif in [(fin, ui.fin), (fout, ui.fout)]:
316 for f, uif in [(fin, ui.fin), (fout, ui.fout)]:
315 if f is not uif:
317 if f is not uif:
316 os.dup2(f.fileno(), uif.fileno())
318 os.dup2(f.fileno(), uif.fileno())
317 f.close()
319 f.close()
318
320
319 class pipeservice(object):
321 class pipeservice(object):
320 def __init__(self, ui, repo, opts):
322 def __init__(self, ui, repo, opts):
321 self.ui = ui
323 self.ui = ui
322 self.repo = repo
324 self.repo = repo
323
325
324 def init(self):
326 def init(self):
325 pass
327 pass
326
328
327 def run(self):
329 def run(self):
328 ui = self.ui
330 ui = self.ui
329 # redirect stdio to null device so that broken extensions or in-process
331 # redirect stdio to null device so that broken extensions or in-process
330 # hooks will never cause corruption of channel protocol.
332 # hooks will never cause corruption of channel protocol.
331 fin, fout = _protectio(ui)
333 fin, fout = _protectio(ui)
332 try:
334 try:
333 sv = server(ui, self.repo, fin, fout)
335 sv = server(ui, self.repo, fin, fout)
334 return sv.serve()
336 return sv.serve()
335 finally:
337 finally:
336 sv.cleanup()
338 sv.cleanup()
337 _restoreio(ui, fin, fout)
339 _restoreio(ui, fin, fout)
338
340
339 class _requesthandler(socketserver.StreamRequestHandler):
341 class _requesthandler(socketserver.StreamRequestHandler):
340 def handle(self):
342 def handle(self):
343 # use a different process group from the master process, making this
344 # process pass kernel "is_current_pgrp_orphaned" check so signals like
345 # SIGTSTP, SIGTTIN, SIGTTOU are not ignored.
346 os.setpgid(0, 0)
347 # change random state otherwise forked request handlers would have a
348 # same state inherited from parent.
349 random.seed()
341 ui = self.server.ui
350 ui = self.server.ui
342 sv = None
351 sv = None
343 try:
352 try:
344 sv = self._createcmdserver()
353 sv = self._createcmdserver()
345 try:
354 try:
346 sv.serve()
355 sv.serve()
347 # handle exceptions that may be raised by command server. most of
356 # handle exceptions that may be raised by command server. most of
348 # known exceptions are caught by dispatch.
357 # known exceptions are caught by dispatch.
349 except error.Abort as inst:
358 except error.Abort as inst:
350 ui.warn(_('abort: %s\n') % inst)
359 ui.warn(_('abort: %s\n') % inst)
351 except IOError as inst:
360 except IOError as inst:
352 if inst.errno != errno.EPIPE:
361 if inst.errno != errno.EPIPE:
353 raise
362 raise
354 except KeyboardInterrupt:
363 except KeyboardInterrupt:
355 pass
364 pass
356 finally:
365 finally:
357 sv.cleanup()
366 sv.cleanup()
358 except: # re-raises
367 except: # re-raises
359 # also write traceback to error channel. otherwise client cannot
368 # also write traceback to error channel. otherwise client cannot
360 # see it because it is written to server's stderr by default.
369 # see it because it is written to server's stderr by default.
361 if sv:
370 if sv:
362 cerr = sv.cerr
371 cerr = sv.cerr
363 else:
372 else:
364 cerr = channeledoutput(self.wfile, 'e')
373 cerr = channeledoutput(self.wfile, 'e')
365 traceback.print_exc(file=cerr)
374 traceback.print_exc(file=cerr)
366 raise
375 raise
376 finally:
377 # trigger __del__ since ForkingMixIn uses os._exit
378 gc.collect()
367
379
368 def _createcmdserver(self):
380 def _createcmdserver(self):
369 ui = self.server.ui
381 ui = self.server.ui
370 repo = self.server.repo
382 repo = self.server.repo
371 return server(ui, repo, self.rfile, self.wfile)
383 return server(ui, repo, self.rfile, self.wfile)
372
384
373 class unixservice(object):
385 class unixservice(object):
374 """
386 """
375 Listens on unix domain socket and forks server per connection
387 Listens on unix domain socket and forks server per connection
376 """
388 """
377 def __init__(self, ui, repo, opts):
389 def __init__(self, ui, repo, opts):
378 self.ui = ui
390 self.ui = ui
379 self.repo = repo
391 self.repo = repo
380 self.address = opts['address']
392 self.address = opts['address']
381 if not util.safehasattr(socketserver, 'UnixStreamServer'):
393 if not util.safehasattr(socketserver, 'UnixStreamServer'):
382 raise error.Abort(_('unsupported platform'))
394 raise error.Abort(_('unsupported platform'))
383 if not self.address:
395 if not self.address:
384 raise error.Abort(_('no socket path specified with --address'))
396 raise error.Abort(_('no socket path specified with --address'))
385
397
386 def init(self):
398 def init(self):
387 class cls(socketserver.ForkingMixIn, socketserver.UnixStreamServer):
399 class cls(socketserver.ForkingMixIn, socketserver.UnixStreamServer):
388 ui = self.ui
400 ui = self.ui
389 repo = self.repo
401 repo = self.repo
390 self.server = cls(self.address, _requesthandler)
402 self.server = cls(self.address, _requesthandler)
391 self.ui.status(_('listening at %s\n') % self.address)
403 self.ui.status(_('listening at %s\n') % self.address)
392 self.ui.flush() # avoid buffering of status message
404 self.ui.flush() # avoid buffering of status message
393
405
394 def run(self):
406 def run(self):
395 try:
407 try:
396 self.server.serve_forever()
408 self.server.serve_forever()
397 finally:
409 finally:
398 os.unlink(self.address)
410 os.unlink(self.address)
399
411
400 _servicemap = {
412 _servicemap = {
401 'pipe': pipeservice,
413 'pipe': pipeservice,
402 'unix': unixservice,
414 'unix': unixservice,
403 }
415 }
404
416
405 def createservice(ui, repo, opts):
417 def createservice(ui, repo, opts):
406 mode = opts['cmdserver']
418 mode = opts['cmdserver']
407 try:
419 try:
408 return _servicemap[mode](ui, repo, opts)
420 return _servicemap[mode](ui, repo, opts)
409 except KeyError:
421 except KeyError:
410 raise error.Abort(_('unknown mode %s') % mode)
422 raise error.Abort(_('unknown mode %s') % mode)
General Comments 0
You need to be logged in to leave comments. Login now