##// END OF EJS Templates
commandserver: install logger to record server events through canonical API...
Yuya Nishihara -
r40859:82210d88 default
parent child Browse files
Show More
@@ -1,636 +1,638 b''
1 # chgserver.py - command server extension for cHg
1 # chgserver.py - command server extension for cHg
2 #
2 #
3 # Copyright 2011 Yuya Nishihara <yuya@tcha.org>
3 # Copyright 2011 Yuya Nishihara <yuya@tcha.org>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 """command server extension for cHg
8 """command server extension for cHg
9
9
10 'S' channel (read/write)
10 'S' channel (read/write)
11 propagate ui.system() request to client
11 propagate ui.system() request to client
12
12
13 'attachio' command
13 'attachio' command
14 attach client's stdio passed by sendmsg()
14 attach client's stdio passed by sendmsg()
15
15
16 'chdir' command
16 'chdir' command
17 change current directory
17 change current directory
18
18
19 'setenv' command
19 'setenv' command
20 replace os.environ completely
20 replace os.environ completely
21
21
22 'setumask' command (DEPRECATED)
22 'setumask' command (DEPRECATED)
23 'setumask2' command
23 'setumask2' command
24 set umask
24 set umask
25
25
26 'validate' command
26 'validate' command
27 reload the config and check if the server is up to date
27 reload the config and check if the server is up to date
28
28
29 Config
29 Config
30 ------
30 ------
31
31
32 ::
32 ::
33
33
34 [chgserver]
34 [chgserver]
35 # how long (in seconds) should an idle chg server exit
35 # how long (in seconds) should an idle chg server exit
36 idletimeout = 3600
36 idletimeout = 3600
37
37
38 # whether to skip config or env change checks
38 # whether to skip config or env change checks
39 skiphash = False
39 skiphash = False
40 """
40 """
41
41
42 from __future__ import absolute_import
42 from __future__ import absolute_import
43
43
44 import hashlib
44 import hashlib
45 import inspect
45 import inspect
46 import os
46 import os
47 import re
47 import re
48 import socket
48 import socket
49 import stat
49 import stat
50 import struct
50 import struct
51 import time
51 import time
52
52
53 from .i18n import _
53 from .i18n import _
54
54
55 from . import (
55 from . import (
56 commandserver,
56 commandserver,
57 encoding,
57 encoding,
58 error,
58 error,
59 extensions,
59 extensions,
60 node,
60 node,
61 pycompat,
61 pycompat,
62 util,
62 util,
63 )
63 )
64
64
65 from .utils import (
65 from .utils import (
66 procutil,
66 procutil,
67 )
67 )
68
68
69 _log = commandserver.log
69 _log = commandserver.log
70
70
71 def _hashlist(items):
71 def _hashlist(items):
72 """return sha1 hexdigest for a list"""
72 """return sha1 hexdigest for a list"""
73 return node.hex(hashlib.sha1(str(items)).digest())
73 return node.hex(hashlib.sha1(str(items)).digest())
74
74
75 # sensitive config sections affecting confighash
75 # sensitive config sections affecting confighash
76 _configsections = [
76 _configsections = [
77 'alias', # affects global state commands.table
77 'alias', # affects global state commands.table
78 'eol', # uses setconfig('eol', ...)
78 'eol', # uses setconfig('eol', ...)
79 'extdiff', # uisetup will register new commands
79 'extdiff', # uisetup will register new commands
80 'extensions',
80 'extensions',
81 ]
81 ]
82
82
83 _configsectionitems = [
83 _configsectionitems = [
84 ('commands', 'show.aliasprefix'), # show.py reads it in extsetup
84 ('commands', 'show.aliasprefix'), # show.py reads it in extsetup
85 ]
85 ]
86
86
87 # sensitive environment variables affecting confighash
87 # sensitive environment variables affecting confighash
88 _envre = re.compile(r'''\A(?:
88 _envre = re.compile(r'''\A(?:
89 CHGHG
89 CHGHG
90 |HG(?:DEMANDIMPORT|EMITWARNINGS|MODULEPOLICY|PROF|RCPATH)?
90 |HG(?:DEMANDIMPORT|EMITWARNINGS|MODULEPOLICY|PROF|RCPATH)?
91 |HG(?:ENCODING|PLAIN).*
91 |HG(?:ENCODING|PLAIN).*
92 |LANG(?:UAGE)?
92 |LANG(?:UAGE)?
93 |LC_.*
93 |LC_.*
94 |LD_.*
94 |LD_.*
95 |PATH
95 |PATH
96 |PYTHON.*
96 |PYTHON.*
97 |TERM(?:INFO)?
97 |TERM(?:INFO)?
98 |TZ
98 |TZ
99 )\Z''', re.X)
99 )\Z''', re.X)
100
100
101 def _confighash(ui):
101 def _confighash(ui):
102 """return a quick hash for detecting config/env changes
102 """return a quick hash for detecting config/env changes
103
103
104 confighash is the hash of sensitive config items and environment variables.
104 confighash is the hash of sensitive config items and environment variables.
105
105
106 for chgserver, it is designed that once confighash changes, the server is
106 for chgserver, it is designed that once confighash changes, the server is
107 not qualified to serve its client and should redirect the client to a new
107 not qualified to serve its client and should redirect the client to a new
108 server. different from mtimehash, confighash change will not mark the
108 server. different from mtimehash, confighash change will not mark the
109 server outdated and exit since the user can have different configs at the
109 server outdated and exit since the user can have different configs at the
110 same time.
110 same time.
111 """
111 """
112 sectionitems = []
112 sectionitems = []
113 for section in _configsections:
113 for section in _configsections:
114 sectionitems.append(ui.configitems(section))
114 sectionitems.append(ui.configitems(section))
115 for section, item in _configsectionitems:
115 for section, item in _configsectionitems:
116 sectionitems.append(ui.config(section, item))
116 sectionitems.append(ui.config(section, item))
117 sectionhash = _hashlist(sectionitems)
117 sectionhash = _hashlist(sectionitems)
118 # If $CHGHG is set, the change to $HG should not trigger a new chg server
118 # If $CHGHG is set, the change to $HG should not trigger a new chg server
119 if 'CHGHG' in encoding.environ:
119 if 'CHGHG' in encoding.environ:
120 ignored = {'HG'}
120 ignored = {'HG'}
121 else:
121 else:
122 ignored = set()
122 ignored = set()
123 envitems = [(k, v) for k, v in encoding.environ.iteritems()
123 envitems = [(k, v) for k, v in encoding.environ.iteritems()
124 if _envre.match(k) and k not in ignored]
124 if _envre.match(k) and k not in ignored]
125 envhash = _hashlist(sorted(envitems))
125 envhash = _hashlist(sorted(envitems))
126 return sectionhash[:6] + envhash[:6]
126 return sectionhash[:6] + envhash[:6]
127
127
128 def _getmtimepaths(ui):
128 def _getmtimepaths(ui):
129 """get a list of paths that should be checked to detect change
129 """get a list of paths that should be checked to detect change
130
130
131 The list will include:
131 The list will include:
132 - extensions (will not cover all files for complex extensions)
132 - extensions (will not cover all files for complex extensions)
133 - mercurial/__version__.py
133 - mercurial/__version__.py
134 - python binary
134 - python binary
135 """
135 """
136 modules = [m for n, m in extensions.extensions(ui)]
136 modules = [m for n, m in extensions.extensions(ui)]
137 try:
137 try:
138 from . import __version__
138 from . import __version__
139 modules.append(__version__)
139 modules.append(__version__)
140 except ImportError:
140 except ImportError:
141 pass
141 pass
142 files = [pycompat.sysexecutable]
142 files = [pycompat.sysexecutable]
143 for m in modules:
143 for m in modules:
144 try:
144 try:
145 files.append(inspect.getabsfile(m))
145 files.append(inspect.getabsfile(m))
146 except TypeError:
146 except TypeError:
147 pass
147 pass
148 return sorted(set(files))
148 return sorted(set(files))
149
149
150 def _mtimehash(paths):
150 def _mtimehash(paths):
151 """return a quick hash for detecting file changes
151 """return a quick hash for detecting file changes
152
152
153 mtimehash calls stat on given paths and calculate a hash based on size and
153 mtimehash calls stat on given paths and calculate a hash based on size and
154 mtime of each file. mtimehash does not read file content because reading is
154 mtime of each file. mtimehash does not read file content because reading is
155 expensive. therefore it's not 100% reliable for detecting content changes.
155 expensive. therefore it's not 100% reliable for detecting content changes.
156 it's possible to return different hashes for same file contents.
156 it's possible to return different hashes for same file contents.
157 it's also possible to return a same hash for different file contents for
157 it's also possible to return a same hash for different file contents for
158 some carefully crafted situation.
158 some carefully crafted situation.
159
159
160 for chgserver, it is designed that once mtimehash changes, the server is
160 for chgserver, it is designed that once mtimehash changes, the server is
161 considered outdated immediately and should no longer provide service.
161 considered outdated immediately and should no longer provide service.
162
162
163 mtimehash is not included in confighash because we only know the paths of
163 mtimehash is not included in confighash because we only know the paths of
164 extensions after importing them (there is imp.find_module but that faces
164 extensions after importing them (there is imp.find_module but that faces
165 race conditions). We need to calculate confighash without importing.
165 race conditions). We need to calculate confighash without importing.
166 """
166 """
167 def trystat(path):
167 def trystat(path):
168 try:
168 try:
169 st = os.stat(path)
169 st = os.stat(path)
170 return (st[stat.ST_MTIME], st.st_size)
170 return (st[stat.ST_MTIME], st.st_size)
171 except OSError:
171 except OSError:
172 # could be ENOENT, EPERM etc. not fatal in any case
172 # could be ENOENT, EPERM etc. not fatal in any case
173 pass
173 pass
174 return _hashlist(map(trystat, paths))[:12]
174 return _hashlist(map(trystat, paths))[:12]
175
175
176 class hashstate(object):
176 class hashstate(object):
177 """a structure storing confighash, mtimehash, paths used for mtimehash"""
177 """a structure storing confighash, mtimehash, paths used for mtimehash"""
178 def __init__(self, confighash, mtimehash, mtimepaths):
178 def __init__(self, confighash, mtimehash, mtimepaths):
179 self.confighash = confighash
179 self.confighash = confighash
180 self.mtimehash = mtimehash
180 self.mtimehash = mtimehash
181 self.mtimepaths = mtimepaths
181 self.mtimepaths = mtimepaths
182
182
183 @staticmethod
183 @staticmethod
184 def fromui(ui, mtimepaths=None):
184 def fromui(ui, mtimepaths=None):
185 if mtimepaths is None:
185 if mtimepaths is None:
186 mtimepaths = _getmtimepaths(ui)
186 mtimepaths = _getmtimepaths(ui)
187 confighash = _confighash(ui)
187 confighash = _confighash(ui)
188 mtimehash = _mtimehash(mtimepaths)
188 mtimehash = _mtimehash(mtimepaths)
189 _log('confighash = %s mtimehash = %s\n' % (confighash, mtimehash))
189 _log('confighash = %s mtimehash = %s\n' % (confighash, mtimehash))
190 return hashstate(confighash, mtimehash, mtimepaths)
190 return hashstate(confighash, mtimehash, mtimepaths)
191
191
192 def _newchgui(srcui, csystem, attachio):
192 def _newchgui(srcui, csystem, attachio):
193 class chgui(srcui.__class__):
193 class chgui(srcui.__class__):
194 def __init__(self, src=None):
194 def __init__(self, src=None):
195 super(chgui, self).__init__(src)
195 super(chgui, self).__init__(src)
196 if src:
196 if src:
197 self._csystem = getattr(src, '_csystem', csystem)
197 self._csystem = getattr(src, '_csystem', csystem)
198 else:
198 else:
199 self._csystem = csystem
199 self._csystem = csystem
200
200
201 def _runsystem(self, cmd, environ, cwd, out):
201 def _runsystem(self, cmd, environ, cwd, out):
202 # fallback to the original system method if
202 # fallback to the original system method if
203 # a. the output stream is not stdout (e.g. stderr, cStringIO),
203 # a. the output stream is not stdout (e.g. stderr, cStringIO),
204 # b. or stdout is redirected by protectstdio(),
204 # b. or stdout is redirected by protectstdio(),
205 # because the chg client is not aware of these situations and
205 # because the chg client is not aware of these situations and
206 # will behave differently (i.e. write to stdout).
206 # will behave differently (i.e. write to stdout).
207 if (out is not self.fout
207 if (out is not self.fout
208 or not util.safehasattr(self.fout, 'fileno')
208 or not util.safehasattr(self.fout, 'fileno')
209 or self.fout.fileno() != procutil.stdout.fileno()
209 or self.fout.fileno() != procutil.stdout.fileno()
210 or self._finoutredirected):
210 or self._finoutredirected):
211 return procutil.system(cmd, environ=environ, cwd=cwd, out=out)
211 return procutil.system(cmd, environ=environ, cwd=cwd, out=out)
212 self.flush()
212 self.flush()
213 return self._csystem(cmd, procutil.shellenviron(environ), cwd)
213 return self._csystem(cmd, procutil.shellenviron(environ), cwd)
214
214
215 def _runpager(self, cmd, env=None):
215 def _runpager(self, cmd, env=None):
216 self._csystem(cmd, procutil.shellenviron(env), type='pager',
216 self._csystem(cmd, procutil.shellenviron(env), type='pager',
217 cmdtable={'attachio': attachio})
217 cmdtable={'attachio': attachio})
218 return True
218 return True
219
219
220 return chgui(srcui)
220 return chgui(srcui)
221
221
222 def _loadnewui(srcui, args):
222 def _loadnewui(srcui, args, cdebug):
223 from . import dispatch # avoid cycle
223 from . import dispatch # avoid cycle
224
224
225 newui = srcui.__class__.load()
225 newui = srcui.__class__.load()
226 for a in ['fin', 'fout', 'ferr', 'environ']:
226 for a in ['fin', 'fout', 'ferr', 'environ']:
227 setattr(newui, a, getattr(srcui, a))
227 setattr(newui, a, getattr(srcui, a))
228 if util.safehasattr(srcui, '_csystem'):
228 if util.safehasattr(srcui, '_csystem'):
229 newui._csystem = srcui._csystem
229 newui._csystem = srcui._csystem
230
230
231 # command line args
231 # command line args
232 options = dispatch._earlyparseopts(newui, args)
232 options = dispatch._earlyparseopts(newui, args)
233 dispatch._parseconfig(newui, options['config'])
233 dispatch._parseconfig(newui, options['config'])
234
234
235 # stolen from tortoisehg.util.copydynamicconfig()
235 # stolen from tortoisehg.util.copydynamicconfig()
236 for section, name, value in srcui.walkconfig():
236 for section, name, value in srcui.walkconfig():
237 source = srcui.configsource(section, name)
237 source = srcui.configsource(section, name)
238 if ':' in source or source == '--config' or source.startswith('$'):
238 if ':' in source or source == '--config' or source.startswith('$'):
239 # path:line or command line, or environ
239 # path:line or command line, or environ
240 continue
240 continue
241 newui.setconfig(section, name, value, source)
241 newui.setconfig(section, name, value, source)
242
242
243 # load wd and repo config, copied from dispatch.py
243 # load wd and repo config, copied from dispatch.py
244 cwd = options['cwd']
244 cwd = options['cwd']
245 cwd = cwd and os.path.realpath(cwd) or None
245 cwd = cwd and os.path.realpath(cwd) or None
246 rpath = options['repository']
246 rpath = options['repository']
247 path, newlui = dispatch._getlocal(newui, rpath, wd=cwd)
247 path, newlui = dispatch._getlocal(newui, rpath, wd=cwd)
248
248
249 extensions.populateui(newui)
249 extensions.populateui(newui)
250 commandserver.setuplogging(newui, fp=cdebug)
250 if newui is not newlui:
251 if newui is not newlui:
251 extensions.populateui(newlui)
252 extensions.populateui(newlui)
253 commandserver.setuplogging(newlui, fp=cdebug)
252
254
253 return (newui, newlui)
255 return (newui, newlui)
254
256
255 class channeledsystem(object):
257 class channeledsystem(object):
256 """Propagate ui.system() request in the following format:
258 """Propagate ui.system() request in the following format:
257
259
258 payload length (unsigned int),
260 payload length (unsigned int),
259 type, '\0',
261 type, '\0',
260 cmd, '\0',
262 cmd, '\0',
261 cwd, '\0',
263 cwd, '\0',
262 envkey, '=', val, '\0',
264 envkey, '=', val, '\0',
263 ...
265 ...
264 envkey, '=', val
266 envkey, '=', val
265
267
266 if type == 'system', waits for:
268 if type == 'system', waits for:
267
269
268 exitcode length (unsigned int),
270 exitcode length (unsigned int),
269 exitcode (int)
271 exitcode (int)
270
272
271 if type == 'pager', repetitively waits for a command name ending with '\n'
273 if type == 'pager', repetitively waits for a command name ending with '\n'
272 and executes it defined by cmdtable, or exits the loop if the command name
274 and executes it defined by cmdtable, or exits the loop if the command name
273 is empty.
275 is empty.
274 """
276 """
275 def __init__(self, in_, out, channel):
277 def __init__(self, in_, out, channel):
276 self.in_ = in_
278 self.in_ = in_
277 self.out = out
279 self.out = out
278 self.channel = channel
280 self.channel = channel
279
281
280 def __call__(self, cmd, environ, cwd=None, type='system', cmdtable=None):
282 def __call__(self, cmd, environ, cwd=None, type='system', cmdtable=None):
281 args = [type, procutil.quotecommand(cmd), os.path.abspath(cwd or '.')]
283 args = [type, procutil.quotecommand(cmd), os.path.abspath(cwd or '.')]
282 args.extend('%s=%s' % (k, v) for k, v in environ.iteritems())
284 args.extend('%s=%s' % (k, v) for k, v in environ.iteritems())
283 data = '\0'.join(args)
285 data = '\0'.join(args)
284 self.out.write(struct.pack('>cI', self.channel, len(data)))
286 self.out.write(struct.pack('>cI', self.channel, len(data)))
285 self.out.write(data)
287 self.out.write(data)
286 self.out.flush()
288 self.out.flush()
287
289
288 if type == 'system':
290 if type == 'system':
289 length = self.in_.read(4)
291 length = self.in_.read(4)
290 length, = struct.unpack('>I', length)
292 length, = struct.unpack('>I', length)
291 if length != 4:
293 if length != 4:
292 raise error.Abort(_('invalid response'))
294 raise error.Abort(_('invalid response'))
293 rc, = struct.unpack('>i', self.in_.read(4))
295 rc, = struct.unpack('>i', self.in_.read(4))
294 return rc
296 return rc
295 elif type == 'pager':
297 elif type == 'pager':
296 while True:
298 while True:
297 cmd = self.in_.readline()[:-1]
299 cmd = self.in_.readline()[:-1]
298 if not cmd:
300 if not cmd:
299 break
301 break
300 if cmdtable and cmd in cmdtable:
302 if cmdtable and cmd in cmdtable:
301 _log('pager subcommand: %s' % cmd)
303 _log('pager subcommand: %s' % cmd)
302 cmdtable[cmd]()
304 cmdtable[cmd]()
303 else:
305 else:
304 raise error.Abort(_('unexpected command: %s') % cmd)
306 raise error.Abort(_('unexpected command: %s') % cmd)
305 else:
307 else:
306 raise error.ProgrammingError('invalid S channel type: %s' % type)
308 raise error.ProgrammingError('invalid S channel type: %s' % type)
307
309
308 _iochannels = [
310 _iochannels = [
309 # server.ch, ui.fp, mode
311 # server.ch, ui.fp, mode
310 ('cin', 'fin', r'rb'),
312 ('cin', 'fin', r'rb'),
311 ('cout', 'fout', r'wb'),
313 ('cout', 'fout', r'wb'),
312 ('cerr', 'ferr', r'wb'),
314 ('cerr', 'ferr', r'wb'),
313 ]
315 ]
314
316
315 class chgcmdserver(commandserver.server):
317 class chgcmdserver(commandserver.server):
316 def __init__(self, ui, repo, fin, fout, sock, hashstate, baseaddress):
318 def __init__(self, ui, repo, fin, fout, sock, hashstate, baseaddress):
317 super(chgcmdserver, self).__init__(
319 super(chgcmdserver, self).__init__(
318 _newchgui(ui, channeledsystem(fin, fout, 'S'), self.attachio),
320 _newchgui(ui, channeledsystem(fin, fout, 'S'), self.attachio),
319 repo, fin, fout)
321 repo, fin, fout)
320 self.clientsock = sock
322 self.clientsock = sock
321 self._ioattached = False
323 self._ioattached = False
322 self._oldios = [] # original (self.ch, ui.fp, fd) before "attachio"
324 self._oldios = [] # original (self.ch, ui.fp, fd) before "attachio"
323 self.hashstate = hashstate
325 self.hashstate = hashstate
324 self.baseaddress = baseaddress
326 self.baseaddress = baseaddress
325 if hashstate is not None:
327 if hashstate is not None:
326 self.capabilities = self.capabilities.copy()
328 self.capabilities = self.capabilities.copy()
327 self.capabilities['validate'] = chgcmdserver.validate
329 self.capabilities['validate'] = chgcmdserver.validate
328
330
329 def cleanup(self):
331 def cleanup(self):
330 super(chgcmdserver, self).cleanup()
332 super(chgcmdserver, self).cleanup()
331 # dispatch._runcatch() does not flush outputs if exception is not
333 # dispatch._runcatch() does not flush outputs if exception is not
332 # handled by dispatch._dispatch()
334 # handled by dispatch._dispatch()
333 self.ui.flush()
335 self.ui.flush()
334 self._restoreio()
336 self._restoreio()
335 self._ioattached = False
337 self._ioattached = False
336
338
337 def attachio(self):
339 def attachio(self):
338 """Attach to client's stdio passed via unix domain socket; all
340 """Attach to client's stdio passed via unix domain socket; all
339 channels except cresult will no longer be used
341 channels except cresult will no longer be used
340 """
342 """
341 # tell client to sendmsg() with 1-byte payload, which makes it
343 # tell client to sendmsg() with 1-byte payload, which makes it
342 # distinctive from "attachio\n" command consumed by client.read()
344 # distinctive from "attachio\n" command consumed by client.read()
343 self.clientsock.sendall(struct.pack('>cI', 'I', 1))
345 self.clientsock.sendall(struct.pack('>cI', 'I', 1))
344 clientfds = util.recvfds(self.clientsock.fileno())
346 clientfds = util.recvfds(self.clientsock.fileno())
345 _log('received fds: %r\n' % clientfds)
347 _log('received fds: %r\n' % clientfds)
346
348
347 ui = self.ui
349 ui = self.ui
348 ui.flush()
350 ui.flush()
349 self._saveio()
351 self._saveio()
350 for fd, (cn, fn, mode) in zip(clientfds, _iochannels):
352 for fd, (cn, fn, mode) in zip(clientfds, _iochannels):
351 assert fd > 0
353 assert fd > 0
352 fp = getattr(ui, fn)
354 fp = getattr(ui, fn)
353 os.dup2(fd, fp.fileno())
355 os.dup2(fd, fp.fileno())
354 os.close(fd)
356 os.close(fd)
355 if self._ioattached:
357 if self._ioattached:
356 continue
358 continue
357 # reset buffering mode when client is first attached. as we want
359 # reset buffering mode when client is first attached. as we want
358 # to see output immediately on pager, the mode stays unchanged
360 # to see output immediately on pager, the mode stays unchanged
359 # when client re-attached. ferr is unchanged because it should
361 # when client re-attached. ferr is unchanged because it should
360 # be unbuffered no matter if it is a tty or not.
362 # be unbuffered no matter if it is a tty or not.
361 if fn == 'ferr':
363 if fn == 'ferr':
362 newfp = fp
364 newfp = fp
363 else:
365 else:
364 # make it line buffered explicitly because the default is
366 # make it line buffered explicitly because the default is
365 # decided on first write(), where fout could be a pager.
367 # decided on first write(), where fout could be a pager.
366 if fp.isatty():
368 if fp.isatty():
367 bufsize = 1 # line buffered
369 bufsize = 1 # line buffered
368 else:
370 else:
369 bufsize = -1 # system default
371 bufsize = -1 # system default
370 newfp = os.fdopen(fp.fileno(), mode, bufsize)
372 newfp = os.fdopen(fp.fileno(), mode, bufsize)
371 setattr(ui, fn, newfp)
373 setattr(ui, fn, newfp)
372 setattr(self, cn, newfp)
374 setattr(self, cn, newfp)
373
375
374 self._ioattached = True
376 self._ioattached = True
375 self.cresult.write(struct.pack('>i', len(clientfds)))
377 self.cresult.write(struct.pack('>i', len(clientfds)))
376
378
377 def _saveio(self):
379 def _saveio(self):
378 if self._oldios:
380 if self._oldios:
379 return
381 return
380 ui = self.ui
382 ui = self.ui
381 for cn, fn, _mode in _iochannels:
383 for cn, fn, _mode in _iochannels:
382 ch = getattr(self, cn)
384 ch = getattr(self, cn)
383 fp = getattr(ui, fn)
385 fp = getattr(ui, fn)
384 fd = os.dup(fp.fileno())
386 fd = os.dup(fp.fileno())
385 self._oldios.append((ch, fp, fd))
387 self._oldios.append((ch, fp, fd))
386
388
387 def _restoreio(self):
389 def _restoreio(self):
388 ui = self.ui
390 ui = self.ui
389 for (ch, fp, fd), (cn, fn, _mode) in zip(self._oldios, _iochannels):
391 for (ch, fp, fd), (cn, fn, _mode) in zip(self._oldios, _iochannels):
390 newfp = getattr(ui, fn)
392 newfp = getattr(ui, fn)
391 # close newfp while it's associated with client; otherwise it
393 # close newfp while it's associated with client; otherwise it
392 # would be closed when newfp is deleted
394 # would be closed when newfp is deleted
393 if newfp is not fp:
395 if newfp is not fp:
394 newfp.close()
396 newfp.close()
395 # restore original fd: fp is open again
397 # restore original fd: fp is open again
396 os.dup2(fd, fp.fileno())
398 os.dup2(fd, fp.fileno())
397 os.close(fd)
399 os.close(fd)
398 setattr(self, cn, ch)
400 setattr(self, cn, ch)
399 setattr(ui, fn, fp)
401 setattr(ui, fn, fp)
400 del self._oldios[:]
402 del self._oldios[:]
401
403
402 def validate(self):
404 def validate(self):
403 """Reload the config and check if the server is up to date
405 """Reload the config and check if the server is up to date
404
406
405 Read a list of '\0' separated arguments.
407 Read a list of '\0' separated arguments.
406 Write a non-empty list of '\0' separated instruction strings or '\0'
408 Write a non-empty list of '\0' separated instruction strings or '\0'
407 if the list is empty.
409 if the list is empty.
408 An instruction string could be either:
410 An instruction string could be either:
409 - "unlink $path", the client should unlink the path to stop the
411 - "unlink $path", the client should unlink the path to stop the
410 outdated server.
412 outdated server.
411 - "redirect $path", the client should attempt to connect to $path
413 - "redirect $path", the client should attempt to connect to $path
412 first. If it does not work, start a new server. It implies
414 first. If it does not work, start a new server. It implies
413 "reconnect".
415 "reconnect".
414 - "exit $n", the client should exit directly with code n.
416 - "exit $n", the client should exit directly with code n.
415 This may happen if we cannot parse the config.
417 This may happen if we cannot parse the config.
416 - "reconnect", the client should close the connection and
418 - "reconnect", the client should close the connection and
417 reconnect.
419 reconnect.
418 If neither "reconnect" nor "redirect" is included in the instruction
420 If neither "reconnect" nor "redirect" is included in the instruction
419 list, the client can continue with this server after completing all
421 list, the client can continue with this server after completing all
420 the instructions.
422 the instructions.
421 """
423 """
422 from . import dispatch # avoid cycle
424 from . import dispatch # avoid cycle
423
425
424 args = self._readlist()
426 args = self._readlist()
425 try:
427 try:
426 self.ui, lui = _loadnewui(self.ui, args)
428 self.ui, lui = _loadnewui(self.ui, args, self.cdebug)
427 except error.ParseError as inst:
429 except error.ParseError as inst:
428 dispatch._formatparse(self.ui.warn, inst)
430 dispatch._formatparse(self.ui.warn, inst)
429 self.ui.flush()
431 self.ui.flush()
430 self.cresult.write('exit 255')
432 self.cresult.write('exit 255')
431 return
433 return
432 except error.Abort as inst:
434 except error.Abort as inst:
433 self.ui.error(_("abort: %s\n") % inst)
435 self.ui.error(_("abort: %s\n") % inst)
434 if inst.hint:
436 if inst.hint:
435 self.ui.error(_("(%s)\n") % inst.hint)
437 self.ui.error(_("(%s)\n") % inst.hint)
436 self.ui.flush()
438 self.ui.flush()
437 self.cresult.write('exit 255')
439 self.cresult.write('exit 255')
438 return
440 return
439 newhash = hashstate.fromui(lui, self.hashstate.mtimepaths)
441 newhash = hashstate.fromui(lui, self.hashstate.mtimepaths)
440 insts = []
442 insts = []
441 if newhash.mtimehash != self.hashstate.mtimehash:
443 if newhash.mtimehash != self.hashstate.mtimehash:
442 addr = _hashaddress(self.baseaddress, self.hashstate.confighash)
444 addr = _hashaddress(self.baseaddress, self.hashstate.confighash)
443 insts.append('unlink %s' % addr)
445 insts.append('unlink %s' % addr)
444 # mtimehash is empty if one or more extensions fail to load.
446 # mtimehash is empty if one or more extensions fail to load.
445 # to be compatible with hg, still serve the client this time.
447 # to be compatible with hg, still serve the client this time.
446 if self.hashstate.mtimehash:
448 if self.hashstate.mtimehash:
447 insts.append('reconnect')
449 insts.append('reconnect')
448 if newhash.confighash != self.hashstate.confighash:
450 if newhash.confighash != self.hashstate.confighash:
449 addr = _hashaddress(self.baseaddress, newhash.confighash)
451 addr = _hashaddress(self.baseaddress, newhash.confighash)
450 insts.append('redirect %s' % addr)
452 insts.append('redirect %s' % addr)
451 _log('validate: %s\n' % insts)
453 _log('validate: %s\n' % insts)
452 self.cresult.write('\0'.join(insts) or '\0')
454 self.cresult.write('\0'.join(insts) or '\0')
453
455
454 def chdir(self):
456 def chdir(self):
455 """Change current directory
457 """Change current directory
456
458
457 Note that the behavior of --cwd option is bit different from this.
459 Note that the behavior of --cwd option is bit different from this.
458 It does not affect --config parameter.
460 It does not affect --config parameter.
459 """
461 """
460 path = self._readstr()
462 path = self._readstr()
461 if not path:
463 if not path:
462 return
464 return
463 _log('chdir to %r\n' % path)
465 _log('chdir to %r\n' % path)
464 os.chdir(path)
466 os.chdir(path)
465
467
466 def setumask(self):
468 def setumask(self):
467 """Change umask (DEPRECATED)"""
469 """Change umask (DEPRECATED)"""
468 # BUG: this does not follow the message frame structure, but kept for
470 # BUG: this does not follow the message frame structure, but kept for
469 # backward compatibility with old chg clients for some time
471 # backward compatibility with old chg clients for some time
470 self._setumask(self._read(4))
472 self._setumask(self._read(4))
471
473
472 def setumask2(self):
474 def setumask2(self):
473 """Change umask"""
475 """Change umask"""
474 data = self._readstr()
476 data = self._readstr()
475 if len(data) != 4:
477 if len(data) != 4:
476 raise ValueError('invalid mask length in setumask2 request')
478 raise ValueError('invalid mask length in setumask2 request')
477 self._setumask(data)
479 self._setumask(data)
478
480
479 def _setumask(self, data):
481 def _setumask(self, data):
480 mask = struct.unpack('>I', data)[0]
482 mask = struct.unpack('>I', data)[0]
481 _log('setumask %r\n' % mask)
483 _log('setumask %r\n' % mask)
482 os.umask(mask)
484 os.umask(mask)
483
485
484 def runcommand(self):
486 def runcommand(self):
485 # pager may be attached within the runcommand session, which should
487 # pager may be attached within the runcommand session, which should
486 # be detached at the end of the session. otherwise the pager wouldn't
488 # be detached at the end of the session. otherwise the pager wouldn't
487 # receive EOF.
489 # receive EOF.
488 globaloldios = self._oldios
490 globaloldios = self._oldios
489 self._oldios = []
491 self._oldios = []
490 try:
492 try:
491 return super(chgcmdserver, self).runcommand()
493 return super(chgcmdserver, self).runcommand()
492 finally:
494 finally:
493 self._restoreio()
495 self._restoreio()
494 self._oldios = globaloldios
496 self._oldios = globaloldios
495
497
496 def setenv(self):
498 def setenv(self):
497 """Clear and update os.environ
499 """Clear and update os.environ
498
500
499 Note that not all variables can make an effect on the running process.
501 Note that not all variables can make an effect on the running process.
500 """
502 """
501 l = self._readlist()
503 l = self._readlist()
502 try:
504 try:
503 newenv = dict(s.split('=', 1) for s in l)
505 newenv = dict(s.split('=', 1) for s in l)
504 except ValueError:
506 except ValueError:
505 raise ValueError('unexpected value in setenv request')
507 raise ValueError('unexpected value in setenv request')
506 _log('setenv: %r\n' % sorted(newenv.keys()))
508 _log('setenv: %r\n' % sorted(newenv.keys()))
507 encoding.environ.clear()
509 encoding.environ.clear()
508 encoding.environ.update(newenv)
510 encoding.environ.update(newenv)
509
511
510 capabilities = commandserver.server.capabilities.copy()
512 capabilities = commandserver.server.capabilities.copy()
511 capabilities.update({'attachio': attachio,
513 capabilities.update({'attachio': attachio,
512 'chdir': chdir,
514 'chdir': chdir,
513 'runcommand': runcommand,
515 'runcommand': runcommand,
514 'setenv': setenv,
516 'setenv': setenv,
515 'setumask': setumask,
517 'setumask': setumask,
516 'setumask2': setumask2})
518 'setumask2': setumask2})
517
519
518 if util.safehasattr(procutil, 'setprocname'):
520 if util.safehasattr(procutil, 'setprocname'):
519 def setprocname(self):
521 def setprocname(self):
520 """Change process title"""
522 """Change process title"""
521 name = self._readstr()
523 name = self._readstr()
522 _log('setprocname: %r\n' % name)
524 _log('setprocname: %r\n' % name)
523 procutil.setprocname(name)
525 procutil.setprocname(name)
524 capabilities['setprocname'] = setprocname
526 capabilities['setprocname'] = setprocname
525
527
526 def _tempaddress(address):
528 def _tempaddress(address):
527 return '%s.%d.tmp' % (address, os.getpid())
529 return '%s.%d.tmp' % (address, os.getpid())
528
530
529 def _hashaddress(address, hashstr):
531 def _hashaddress(address, hashstr):
530 # if the basename of address contains '.', use only the left part. this
532 # if the basename of address contains '.', use only the left part. this
531 # makes it possible for the client to pass 'server.tmp$PID' and follow by
533 # makes it possible for the client to pass 'server.tmp$PID' and follow by
532 # an atomic rename to avoid locking when spawning new servers.
534 # an atomic rename to avoid locking when spawning new servers.
533 dirname, basename = os.path.split(address)
535 dirname, basename = os.path.split(address)
534 basename = basename.split('.', 1)[0]
536 basename = basename.split('.', 1)[0]
535 return '%s-%s' % (os.path.join(dirname, basename), hashstr)
537 return '%s-%s' % (os.path.join(dirname, basename), hashstr)
536
538
537 class chgunixservicehandler(object):
539 class chgunixservicehandler(object):
538 """Set of operations for chg services"""
540 """Set of operations for chg services"""
539
541
540 pollinterval = 1 # [sec]
542 pollinterval = 1 # [sec]
541
543
542 def __init__(self, ui):
544 def __init__(self, ui):
543 self.ui = ui
545 self.ui = ui
544 self._idletimeout = ui.configint('chgserver', 'idletimeout')
546 self._idletimeout = ui.configint('chgserver', 'idletimeout')
545 self._lastactive = time.time()
547 self._lastactive = time.time()
546
548
547 def bindsocket(self, sock, address):
549 def bindsocket(self, sock, address):
548 self._inithashstate(address)
550 self._inithashstate(address)
549 self._checkextensions()
551 self._checkextensions()
550 self._bind(sock)
552 self._bind(sock)
551 self._createsymlink()
553 self._createsymlink()
552 # no "listening at" message should be printed to simulate hg behavior
554 # no "listening at" message should be printed to simulate hg behavior
553
555
554 def _inithashstate(self, address):
556 def _inithashstate(self, address):
555 self._baseaddress = address
557 self._baseaddress = address
556 if self.ui.configbool('chgserver', 'skiphash'):
558 if self.ui.configbool('chgserver', 'skiphash'):
557 self._hashstate = None
559 self._hashstate = None
558 self._realaddress = address
560 self._realaddress = address
559 return
561 return
560 self._hashstate = hashstate.fromui(self.ui)
562 self._hashstate = hashstate.fromui(self.ui)
561 self._realaddress = _hashaddress(address, self._hashstate.confighash)
563 self._realaddress = _hashaddress(address, self._hashstate.confighash)
562
564
563 def _checkextensions(self):
565 def _checkextensions(self):
564 if not self._hashstate:
566 if not self._hashstate:
565 return
567 return
566 if extensions.notloaded():
568 if extensions.notloaded():
567 # one or more extensions failed to load. mtimehash becomes
569 # one or more extensions failed to load. mtimehash becomes
568 # meaningless because we do not know the paths of those extensions.
570 # meaningless because we do not know the paths of those extensions.
569 # set mtimehash to an illegal hash value to invalidate the server.
571 # set mtimehash to an illegal hash value to invalidate the server.
570 self._hashstate.mtimehash = ''
572 self._hashstate.mtimehash = ''
571
573
572 def _bind(self, sock):
574 def _bind(self, sock):
573 # use a unique temp address so we can stat the file and do ownership
575 # use a unique temp address so we can stat the file and do ownership
574 # check later
576 # check later
575 tempaddress = _tempaddress(self._realaddress)
577 tempaddress = _tempaddress(self._realaddress)
576 util.bindunixsocket(sock, tempaddress)
578 util.bindunixsocket(sock, tempaddress)
577 self._socketstat = os.stat(tempaddress)
579 self._socketstat = os.stat(tempaddress)
578 sock.listen(socket.SOMAXCONN)
580 sock.listen(socket.SOMAXCONN)
579 # rename will replace the old socket file if exists atomically. the
581 # rename will replace the old socket file if exists atomically. the
580 # old server will detect ownership change and exit.
582 # old server will detect ownership change and exit.
581 util.rename(tempaddress, self._realaddress)
583 util.rename(tempaddress, self._realaddress)
582
584
583 def _createsymlink(self):
585 def _createsymlink(self):
584 if self._baseaddress == self._realaddress:
586 if self._baseaddress == self._realaddress:
585 return
587 return
586 tempaddress = _tempaddress(self._baseaddress)
588 tempaddress = _tempaddress(self._baseaddress)
587 os.symlink(os.path.basename(self._realaddress), tempaddress)
589 os.symlink(os.path.basename(self._realaddress), tempaddress)
588 util.rename(tempaddress, self._baseaddress)
590 util.rename(tempaddress, self._baseaddress)
589
591
590 def _issocketowner(self):
592 def _issocketowner(self):
591 try:
593 try:
592 st = os.stat(self._realaddress)
594 st = os.stat(self._realaddress)
593 return (st.st_ino == self._socketstat.st_ino and
595 return (st.st_ino == self._socketstat.st_ino and
594 st[stat.ST_MTIME] == self._socketstat[stat.ST_MTIME])
596 st[stat.ST_MTIME] == self._socketstat[stat.ST_MTIME])
595 except OSError:
597 except OSError:
596 return False
598 return False
597
599
598 def unlinksocket(self, address):
600 def unlinksocket(self, address):
599 if not self._issocketowner():
601 if not self._issocketowner():
600 return
602 return
601 # it is possible to have a race condition here that we may
603 # it is possible to have a race condition here that we may
602 # remove another server's socket file. but that's okay
604 # remove another server's socket file. but that's okay
603 # since that server will detect and exit automatically and
605 # since that server will detect and exit automatically and
604 # the client will start a new server on demand.
606 # the client will start a new server on demand.
605 util.tryunlink(self._realaddress)
607 util.tryunlink(self._realaddress)
606
608
607 def shouldexit(self):
609 def shouldexit(self):
608 if not self._issocketowner():
610 if not self._issocketowner():
609 self.ui.debug('%s is not owned, exiting.\n' % self._realaddress)
611 self.ui.debug('%s is not owned, exiting.\n' % self._realaddress)
610 return True
612 return True
611 if time.time() - self._lastactive > self._idletimeout:
613 if time.time() - self._lastactive > self._idletimeout:
612 self.ui.debug('being idle too long. exiting.\n')
614 self.ui.debug('being idle too long. exiting.\n')
613 return True
615 return True
614 return False
616 return False
615
617
616 def newconnection(self):
618 def newconnection(self):
617 self._lastactive = time.time()
619 self._lastactive = time.time()
618
620
619 def createcmdserver(self, repo, conn, fin, fout):
621 def createcmdserver(self, repo, conn, fin, fout):
620 return chgcmdserver(self.ui, repo, fin, fout, conn,
622 return chgcmdserver(self.ui, repo, fin, fout, conn,
621 self._hashstate, self._baseaddress)
623 self._hashstate, self._baseaddress)
622
624
623 def chgunixservice(ui, repo, opts):
625 def chgunixservice(ui, repo, opts):
624 # CHGINTERNALMARK is set by chg client. It is an indication of things are
626 # CHGINTERNALMARK is set by chg client. It is an indication of things are
625 # started by chg so other code can do things accordingly, like disabling
627 # started by chg so other code can do things accordingly, like disabling
626 # demandimport or detecting chg client started by chg client. When executed
628 # demandimport or detecting chg client started by chg client. When executed
627 # here, CHGINTERNALMARK is no longer useful and hence dropped to make
629 # here, CHGINTERNALMARK is no longer useful and hence dropped to make
628 # environ cleaner.
630 # environ cleaner.
629 if 'CHGINTERNALMARK' in encoding.environ:
631 if 'CHGINTERNALMARK' in encoding.environ:
630 del encoding.environ['CHGINTERNALMARK']
632 del encoding.environ['CHGINTERNALMARK']
631
633
632 if repo:
634 if repo:
633 # one chgserver can serve multiple repos. drop repo information
635 # one chgserver can serve multiple repos. drop repo information
634 ui.setconfig('bundle', 'mainreporoot', '', 'repo')
636 ui.setconfig('bundle', 'mainreporoot', '', 'repo')
635 h = chgunixservicehandler(ui)
637 h = chgunixservicehandler(ui)
636 return commandserver.unixforkingservice(ui, repo=None, opts=opts, handler=h)
638 return commandserver.unixforkingservice(ui, repo=None, opts=opts, handler=h)
@@ -1,613 +1,639 b''
1 # commandserver.py - communicate with Mercurial's API over a pipe
1 # commandserver.py - communicate with Mercurial's API over a pipe
2 #
2 #
3 # Copyright Matt Mackall <mpm@selenic.com>
3 # Copyright Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import gc
11 import gc
12 import os
12 import os
13 import random
13 import random
14 import signal
14 import signal
15 import socket
15 import socket
16 import struct
16 import struct
17 import traceback
17 import traceback
18
18
19 try:
19 try:
20 import selectors
20 import selectors
21 selectors.BaseSelector
21 selectors.BaseSelector
22 except ImportError:
22 except ImportError:
23 from .thirdparty import selectors2 as selectors
23 from .thirdparty import selectors2 as selectors
24
24
25 from .i18n import _
25 from .i18n import _
26 from . import (
26 from . import (
27 encoding,
27 encoding,
28 error,
28 error,
29 loggingutil,
29 pycompat,
30 pycompat,
30 util,
31 util,
32 vfs as vfsmod,
31 )
33 )
32 from .utils import (
34 from .utils import (
33 cborutil,
35 cborutil,
34 procutil,
36 procutil,
35 )
37 )
36
38
37 logfile = None
39 logfile = None
38
40
39 def log(*args):
41 def log(*args):
40 if not logfile:
42 if not logfile:
41 return
43 return
42
44
43 for a in args:
45 for a in args:
44 logfile.write(str(a))
46 logfile.write(str(a))
45
47
46 logfile.flush()
48 logfile.flush()
47
49
48 class channeledoutput(object):
50 class channeledoutput(object):
49 """
51 """
50 Write data to out in the following format:
52 Write data to out in the following format:
51
53
52 data length (unsigned int),
54 data length (unsigned int),
53 data
55 data
54 """
56 """
55 def __init__(self, out, channel):
57 def __init__(self, out, channel):
56 self.out = out
58 self.out = out
57 self.channel = channel
59 self.channel = channel
58
60
59 @property
61 @property
60 def name(self):
62 def name(self):
61 return '<%c-channel>' % self.channel
63 return '<%c-channel>' % self.channel
62
64
63 def write(self, data):
65 def write(self, data):
64 if not data:
66 if not data:
65 return
67 return
66 # single write() to guarantee the same atomicity as the underlying file
68 # single write() to guarantee the same atomicity as the underlying file
67 self.out.write(struct.pack('>cI', self.channel, len(data)) + data)
69 self.out.write(struct.pack('>cI', self.channel, len(data)) + data)
68 self.out.flush()
70 self.out.flush()
69
71
70 def __getattr__(self, attr):
72 def __getattr__(self, attr):
71 if attr in (r'isatty', r'fileno', r'tell', r'seek'):
73 if attr in (r'isatty', r'fileno', r'tell', r'seek'):
72 raise AttributeError(attr)
74 raise AttributeError(attr)
73 return getattr(self.out, attr)
75 return getattr(self.out, attr)
74
76
75 class channeledmessage(object):
77 class channeledmessage(object):
76 """
78 """
77 Write encoded message and metadata to out in the following format:
79 Write encoded message and metadata to out in the following format:
78
80
79 data length (unsigned int),
81 data length (unsigned int),
80 encoded message and metadata, as a flat key-value dict.
82 encoded message and metadata, as a flat key-value dict.
81
83
82 Each message should have 'type' attribute. Messages of unknown type
84 Each message should have 'type' attribute. Messages of unknown type
83 should be ignored.
85 should be ignored.
84 """
86 """
85
87
86 # teach ui that write() can take **opts
88 # teach ui that write() can take **opts
87 structured = True
89 structured = True
88
90
89 def __init__(self, out, channel, encodename, encodefn):
91 def __init__(self, out, channel, encodename, encodefn):
90 self._cout = channeledoutput(out, channel)
92 self._cout = channeledoutput(out, channel)
91 self.encoding = encodename
93 self.encoding = encodename
92 self._encodefn = encodefn
94 self._encodefn = encodefn
93
95
94 def write(self, data, **opts):
96 def write(self, data, **opts):
95 opts = pycompat.byteskwargs(opts)
97 opts = pycompat.byteskwargs(opts)
96 if data is not None:
98 if data is not None:
97 opts[b'data'] = data
99 opts[b'data'] = data
98 self._cout.write(self._encodefn(opts))
100 self._cout.write(self._encodefn(opts))
99
101
100 def __getattr__(self, attr):
102 def __getattr__(self, attr):
101 return getattr(self._cout, attr)
103 return getattr(self._cout, attr)
102
104
103 class channeledinput(object):
105 class channeledinput(object):
104 """
106 """
105 Read data from in_.
107 Read data from in_.
106
108
107 Requests for input are written to out in the following format:
109 Requests for input are written to out in the following format:
108 channel identifier - 'I' for plain input, 'L' line based (1 byte)
110 channel identifier - 'I' for plain input, 'L' line based (1 byte)
109 how many bytes to send at most (unsigned int),
111 how many bytes to send at most (unsigned int),
110
112
111 The client replies with:
113 The client replies with:
112 data length (unsigned int), 0 meaning EOF
114 data length (unsigned int), 0 meaning EOF
113 data
115 data
114 """
116 """
115
117
116 maxchunksize = 4 * 1024
118 maxchunksize = 4 * 1024
117
119
118 def __init__(self, in_, out, channel):
120 def __init__(self, in_, out, channel):
119 self.in_ = in_
121 self.in_ = in_
120 self.out = out
122 self.out = out
121 self.channel = channel
123 self.channel = channel
122
124
123 @property
125 @property
124 def name(self):
126 def name(self):
125 return '<%c-channel>' % self.channel
127 return '<%c-channel>' % self.channel
126
128
127 def read(self, size=-1):
129 def read(self, size=-1):
128 if size < 0:
130 if size < 0:
129 # if we need to consume all the clients input, ask for 4k chunks
131 # if we need to consume all the clients input, ask for 4k chunks
130 # so the pipe doesn't fill up risking a deadlock
132 # so the pipe doesn't fill up risking a deadlock
131 size = self.maxchunksize
133 size = self.maxchunksize
132 s = self._read(size, self.channel)
134 s = self._read(size, self.channel)
133 buf = s
135 buf = s
134 while s:
136 while s:
135 s = self._read(size, self.channel)
137 s = self._read(size, self.channel)
136 buf += s
138 buf += s
137
139
138 return buf
140 return buf
139 else:
141 else:
140 return self._read(size, self.channel)
142 return self._read(size, self.channel)
141
143
142 def _read(self, size, channel):
144 def _read(self, size, channel):
143 if not size:
145 if not size:
144 return ''
146 return ''
145 assert size > 0
147 assert size > 0
146
148
147 # tell the client we need at most size bytes
149 # tell the client we need at most size bytes
148 self.out.write(struct.pack('>cI', channel, size))
150 self.out.write(struct.pack('>cI', channel, size))
149 self.out.flush()
151 self.out.flush()
150
152
151 length = self.in_.read(4)
153 length = self.in_.read(4)
152 length = struct.unpack('>I', length)[0]
154 length = struct.unpack('>I', length)[0]
153 if not length:
155 if not length:
154 return ''
156 return ''
155 else:
157 else:
156 return self.in_.read(length)
158 return self.in_.read(length)
157
159
158 def readline(self, size=-1):
160 def readline(self, size=-1):
159 if size < 0:
161 if size < 0:
160 size = self.maxchunksize
162 size = self.maxchunksize
161 s = self._read(size, 'L')
163 s = self._read(size, 'L')
162 buf = s
164 buf = s
163 # keep asking for more until there's either no more or
165 # keep asking for more until there's either no more or
164 # we got a full line
166 # we got a full line
165 while s and s[-1] != '\n':
167 while s and s[-1] != '\n':
166 s = self._read(size, 'L')
168 s = self._read(size, 'L')
167 buf += s
169 buf += s
168
170
169 return buf
171 return buf
170 else:
172 else:
171 return self._read(size, 'L')
173 return self._read(size, 'L')
172
174
173 def __iter__(self):
175 def __iter__(self):
174 return self
176 return self
175
177
176 def next(self):
178 def next(self):
177 l = self.readline()
179 l = self.readline()
178 if not l:
180 if not l:
179 raise StopIteration
181 raise StopIteration
180 return l
182 return l
181
183
182 __next__ = next
184 __next__ = next
183
185
184 def __getattr__(self, attr):
186 def __getattr__(self, attr):
185 if attr in (r'isatty', r'fileno', r'tell', r'seek'):
187 if attr in (r'isatty', r'fileno', r'tell', r'seek'):
186 raise AttributeError(attr)
188 raise AttributeError(attr)
187 return getattr(self.in_, attr)
189 return getattr(self.in_, attr)
188
190
189 _messageencoders = {
191 _messageencoders = {
190 b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
192 b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
191 }
193 }
192
194
193 def _selectmessageencoder(ui):
195 def _selectmessageencoder(ui):
194 # experimental config: cmdserver.message-encodings
196 # experimental config: cmdserver.message-encodings
195 encnames = ui.configlist(b'cmdserver', b'message-encodings')
197 encnames = ui.configlist(b'cmdserver', b'message-encodings')
196 for n in encnames:
198 for n in encnames:
197 f = _messageencoders.get(n)
199 f = _messageencoders.get(n)
198 if f:
200 if f:
199 return n, f
201 return n, f
200 raise error.Abort(b'no supported message encodings: %s'
202 raise error.Abort(b'no supported message encodings: %s'
201 % b' '.join(encnames))
203 % b' '.join(encnames))
202
204
203 class server(object):
205 class server(object):
204 """
206 """
205 Listens for commands on fin, runs them and writes the output on a channel
207 Listens for commands on fin, runs them and writes the output on a channel
206 based stream to fout.
208 based stream to fout.
207 """
209 """
208 def __init__(self, ui, repo, fin, fout):
210 def __init__(self, ui, repo, fin, fout):
209 self.cwd = encoding.getcwd()
211 self.cwd = encoding.getcwd()
210
212
211 if ui.config("cmdserver", "log") == '-':
213 if ui.config("cmdserver", "log") == '-':
212 global logfile
214 global logfile
213 # switch log stream to the 'd' (debug) channel
215 # switch log stream to the 'd' (debug) channel
214 logfile = channeledoutput(fout, 'd')
216 logfile = channeledoutput(fout, 'd')
215
217
216 if repo:
218 if repo:
217 # the ui here is really the repo ui so take its baseui so we don't
219 # the ui here is really the repo ui so take its baseui so we don't
218 # end up with its local configuration
220 # end up with its local configuration
219 self.ui = repo.baseui
221 self.ui = repo.baseui
220 self.repo = repo
222 self.repo = repo
221 self.repoui = repo.ui
223 self.repoui = repo.ui
222 else:
224 else:
223 self.ui = ui
225 self.ui = ui
224 self.repo = self.repoui = None
226 self.repo = self.repoui = None
225
227
228 self.cdebug = logfile
226 self.cerr = channeledoutput(fout, 'e')
229 self.cerr = channeledoutput(fout, 'e')
227 self.cout = channeledoutput(fout, 'o')
230 self.cout = channeledoutput(fout, 'o')
228 self.cin = channeledinput(fin, fout, 'I')
231 self.cin = channeledinput(fin, fout, 'I')
229 self.cresult = channeledoutput(fout, 'r')
232 self.cresult = channeledoutput(fout, 'r')
230
233
234 if self.ui.config(b'cmdserver', b'log') == b'-':
235 # switch log stream of server's ui to the 'd' (debug) channel
236 # (don't touch repo.ui as its lifetime is longer than the server)
237 self.ui = self.ui.copy()
238 setuplogging(self.ui, repo=None, fp=self.cdebug)
239
231 # TODO: add this to help/config.txt when stabilized
240 # TODO: add this to help/config.txt when stabilized
232 # ``channel``
241 # ``channel``
233 # Use separate channel for structured output. (Command-server only)
242 # Use separate channel for structured output. (Command-server only)
234 self.cmsg = None
243 self.cmsg = None
235 if ui.config(b'ui', b'message-output') == b'channel':
244 if ui.config(b'ui', b'message-output') == b'channel':
236 encname, encfn = _selectmessageencoder(ui)
245 encname, encfn = _selectmessageencoder(ui)
237 self.cmsg = channeledmessage(fout, b'm', encname, encfn)
246 self.cmsg = channeledmessage(fout, b'm', encname, encfn)
238
247
239 self.client = fin
248 self.client = fin
240
249
241 def cleanup(self):
250 def cleanup(self):
242 """release and restore resources taken during server session"""
251 """release and restore resources taken during server session"""
243
252
244 def _read(self, size):
253 def _read(self, size):
245 if not size:
254 if not size:
246 return ''
255 return ''
247
256
248 data = self.client.read(size)
257 data = self.client.read(size)
249
258
250 # is the other end closed?
259 # is the other end closed?
251 if not data:
260 if not data:
252 raise EOFError
261 raise EOFError
253
262
254 return data
263 return data
255
264
256 def _readstr(self):
265 def _readstr(self):
257 """read a string from the channel
266 """read a string from the channel
258
267
259 format:
268 format:
260 data length (uint32), data
269 data length (uint32), data
261 """
270 """
262 length = struct.unpack('>I', self._read(4))[0]
271 length = struct.unpack('>I', self._read(4))[0]
263 if not length:
272 if not length:
264 return ''
273 return ''
265 return self._read(length)
274 return self._read(length)
266
275
267 def _readlist(self):
276 def _readlist(self):
268 """read a list of NULL separated strings from the channel"""
277 """read a list of NULL separated strings from the channel"""
269 s = self._readstr()
278 s = self._readstr()
270 if s:
279 if s:
271 return s.split('\0')
280 return s.split('\0')
272 else:
281 else:
273 return []
282 return []
274
283
275 def runcommand(self):
284 def runcommand(self):
276 """ reads a list of \0 terminated arguments, executes
285 """ reads a list of \0 terminated arguments, executes
277 and writes the return code to the result channel """
286 and writes the return code to the result channel """
278 from . import dispatch # avoid cycle
287 from . import dispatch # avoid cycle
279
288
280 args = self._readlist()
289 args = self._readlist()
281
290
282 # copy the uis so changes (e.g. --config or --verbose) don't
291 # copy the uis so changes (e.g. --config or --verbose) don't
283 # persist between requests
292 # persist between requests
284 copiedui = self.ui.copy()
293 copiedui = self.ui.copy()
285 uis = [copiedui]
294 uis = [copiedui]
286 if self.repo:
295 if self.repo:
287 self.repo.baseui = copiedui
296 self.repo.baseui = copiedui
288 # clone ui without using ui.copy because this is protected
297 # clone ui without using ui.copy because this is protected
289 repoui = self.repoui.__class__(self.repoui)
298 repoui = self.repoui.__class__(self.repoui)
290 repoui.copy = copiedui.copy # redo copy protection
299 repoui.copy = copiedui.copy # redo copy protection
291 uis.append(repoui)
300 uis.append(repoui)
292 self.repo.ui = self.repo.dirstate._ui = repoui
301 self.repo.ui = self.repo.dirstate._ui = repoui
293 self.repo.invalidateall()
302 self.repo.invalidateall()
294
303
295 for ui in uis:
304 for ui in uis:
296 ui.resetstate()
305 ui.resetstate()
297 # any kind of interaction must use server channels, but chg may
306 # any kind of interaction must use server channels, but chg may
298 # replace channels by fully functional tty files. so nontty is
307 # replace channels by fully functional tty files. so nontty is
299 # enforced only if cin is a channel.
308 # enforced only if cin is a channel.
300 if not util.safehasattr(self.cin, 'fileno'):
309 if not util.safehasattr(self.cin, 'fileno'):
301 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
310 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
302
311
303 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
312 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
304 self.cout, self.cerr, self.cmsg)
313 self.cout, self.cerr, self.cmsg)
305
314
306 try:
315 try:
307 ret = dispatch.dispatch(req) & 255
316 ret = dispatch.dispatch(req) & 255
308 self.cresult.write(struct.pack('>i', int(ret)))
317 self.cresult.write(struct.pack('>i', int(ret)))
309 finally:
318 finally:
310 # restore old cwd
319 # restore old cwd
311 if '--cwd' in args:
320 if '--cwd' in args:
312 os.chdir(self.cwd)
321 os.chdir(self.cwd)
313
322
314 def getencoding(self):
323 def getencoding(self):
315 """ writes the current encoding to the result channel """
324 """ writes the current encoding to the result channel """
316 self.cresult.write(encoding.encoding)
325 self.cresult.write(encoding.encoding)
317
326
318 def serveone(self):
327 def serveone(self):
319 cmd = self.client.readline()[:-1]
328 cmd = self.client.readline()[:-1]
320 if cmd:
329 if cmd:
321 handler = self.capabilities.get(cmd)
330 handler = self.capabilities.get(cmd)
322 if handler:
331 if handler:
323 handler(self)
332 handler(self)
324 else:
333 else:
325 # clients are expected to check what commands are supported by
334 # clients are expected to check what commands are supported by
326 # looking at the servers capabilities
335 # looking at the servers capabilities
327 raise error.Abort(_('unknown command %s') % cmd)
336 raise error.Abort(_('unknown command %s') % cmd)
328
337
329 return cmd != ''
338 return cmd != ''
330
339
331 capabilities = {'runcommand': runcommand,
340 capabilities = {'runcommand': runcommand,
332 'getencoding': getencoding}
341 'getencoding': getencoding}
333
342
334 def serve(self):
343 def serve(self):
335 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
344 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
336 hellomsg += '\n'
345 hellomsg += '\n'
337 hellomsg += 'encoding: ' + encoding.encoding
346 hellomsg += 'encoding: ' + encoding.encoding
338 hellomsg += '\n'
347 hellomsg += '\n'
339 if self.cmsg:
348 if self.cmsg:
340 hellomsg += 'message-encoding: %s\n' % self.cmsg.encoding
349 hellomsg += 'message-encoding: %s\n' % self.cmsg.encoding
341 hellomsg += 'pid: %d' % procutil.getpid()
350 hellomsg += 'pid: %d' % procutil.getpid()
342 if util.safehasattr(os, 'getpgid'):
351 if util.safehasattr(os, 'getpgid'):
343 hellomsg += '\n'
352 hellomsg += '\n'
344 hellomsg += 'pgid: %d' % os.getpgid(0)
353 hellomsg += 'pgid: %d' % os.getpgid(0)
345
354
346 # write the hello msg in -one- chunk
355 # write the hello msg in -one- chunk
347 self.cout.write(hellomsg)
356 self.cout.write(hellomsg)
348
357
349 try:
358 try:
350 while self.serveone():
359 while self.serveone():
351 pass
360 pass
352 except EOFError:
361 except EOFError:
353 # we'll get here if the client disconnected while we were reading
362 # we'll get here if the client disconnected while we were reading
354 # its request
363 # its request
355 return 1
364 return 1
356
365
357 return 0
366 return 0
358
367
359 def setuplogging(ui):
368 def setuplogging(ui, repo=None, fp=None):
360 """Set up server logging facility
369 """Set up server logging facility
361
370
362 If cmdserver.log is '-', log messages will be sent to the 'd' channel
371 If cmdserver.log is '-', log messages will be sent to the given fp.
363 while a client is connected. Otherwise, messages will be written to
372 It should be the 'd' channel while a client is connected, and otherwise
364 the stderr of the server process.
373 is the stderr of the server process.
365 """
374 """
366 # developer config: cmdserver.log
375 # developer config: cmdserver.log
367 logpath = ui.config(b'cmdserver', b'log')
376 logpath = ui.config(b'cmdserver', b'log')
368 if not logpath:
377 if not logpath:
369 return
378 return
379 tracked = {b'cmdserver'}
370
380
371 global logfile
381 global logfile
372 if logpath == b'-':
382 if logpath == b'-':
373 logfile = ui.ferr
383 logfile = ui.ferr
374 else:
384 else:
375 logfile = open(logpath, 'ab')
385 logfile = open(logpath, 'ab')
376
386
387 if logpath == b'-' and fp:
388 logger = loggingutil.fileobjectlogger(fp, tracked)
389 elif logpath == b'-':
390 logger = loggingutil.fileobjectlogger(ui.ferr, tracked)
391 else:
392 logpath = os.path.abspath(logpath)
393 vfs = vfsmod.vfs(os.path.dirname(logpath))
394 logger = loggingutil.filelogger(vfs, os.path.basename(logpath), tracked)
395
396 targetuis = {ui}
397 if repo:
398 targetuis.add(repo.baseui)
399 targetuis.add(repo.ui)
400 for u in targetuis:
401 u.setlogger(b'cmdserver', logger)
402
377 class pipeservice(object):
403 class pipeservice(object):
378 def __init__(self, ui, repo, opts):
404 def __init__(self, ui, repo, opts):
379 self.ui = ui
405 self.ui = ui
380 self.repo = repo
406 self.repo = repo
381
407
382 def init(self):
408 def init(self):
383 pass
409 pass
384
410
385 def run(self):
411 def run(self):
386 ui = self.ui
412 ui = self.ui
387 # redirect stdio to null device so that broken extensions or in-process
413 # redirect stdio to null device so that broken extensions or in-process
388 # hooks will never cause corruption of channel protocol.
414 # hooks will never cause corruption of channel protocol.
389 with procutil.protectedstdio(ui.fin, ui.fout) as (fin, fout):
415 with procutil.protectedstdio(ui.fin, ui.fout) as (fin, fout):
390 sv = server(ui, self.repo, fin, fout)
416 sv = server(ui, self.repo, fin, fout)
391 try:
417 try:
392 return sv.serve()
418 return sv.serve()
393 finally:
419 finally:
394 sv.cleanup()
420 sv.cleanup()
395
421
396 def _initworkerprocess():
422 def _initworkerprocess():
397 # use a different process group from the master process, in order to:
423 # use a different process group from the master process, in order to:
398 # 1. make the current process group no longer "orphaned" (because the
424 # 1. make the current process group no longer "orphaned" (because the
399 # parent of this process is in a different process group while
425 # parent of this process is in a different process group while
400 # remains in a same session)
426 # remains in a same session)
401 # according to POSIX 2.2.2.52, orphaned process group will ignore
427 # according to POSIX 2.2.2.52, orphaned process group will ignore
402 # terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
428 # terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
403 # cause trouble for things like ncurses.
429 # cause trouble for things like ncurses.
404 # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
430 # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
405 # SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
431 # SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
406 # processes like ssh will be killed properly, without affecting
432 # processes like ssh will be killed properly, without affecting
407 # unrelated processes.
433 # unrelated processes.
408 os.setpgid(0, 0)
434 os.setpgid(0, 0)
409 # change random state otherwise forked request handlers would have a
435 # change random state otherwise forked request handlers would have a
410 # same state inherited from parent.
436 # same state inherited from parent.
411 random.seed()
437 random.seed()
412
438
413 def _serverequest(ui, repo, conn, createcmdserver):
439 def _serverequest(ui, repo, conn, createcmdserver):
414 fin = conn.makefile(r'rb')
440 fin = conn.makefile(r'rb')
415 fout = conn.makefile(r'wb')
441 fout = conn.makefile(r'wb')
416 sv = None
442 sv = None
417 try:
443 try:
418 sv = createcmdserver(repo, conn, fin, fout)
444 sv = createcmdserver(repo, conn, fin, fout)
419 try:
445 try:
420 sv.serve()
446 sv.serve()
421 # handle exceptions that may be raised by command server. most of
447 # handle exceptions that may be raised by command server. most of
422 # known exceptions are caught by dispatch.
448 # known exceptions are caught by dispatch.
423 except error.Abort as inst:
449 except error.Abort as inst:
424 ui.error(_('abort: %s\n') % inst)
450 ui.error(_('abort: %s\n') % inst)
425 except IOError as inst:
451 except IOError as inst:
426 if inst.errno != errno.EPIPE:
452 if inst.errno != errno.EPIPE:
427 raise
453 raise
428 except KeyboardInterrupt:
454 except KeyboardInterrupt:
429 pass
455 pass
430 finally:
456 finally:
431 sv.cleanup()
457 sv.cleanup()
432 except: # re-raises
458 except: # re-raises
433 # also write traceback to error channel. otherwise client cannot
459 # also write traceback to error channel. otherwise client cannot
434 # see it because it is written to server's stderr by default.
460 # see it because it is written to server's stderr by default.
435 if sv:
461 if sv:
436 cerr = sv.cerr
462 cerr = sv.cerr
437 else:
463 else:
438 cerr = channeledoutput(fout, 'e')
464 cerr = channeledoutput(fout, 'e')
439 cerr.write(encoding.strtolocal(traceback.format_exc()))
465 cerr.write(encoding.strtolocal(traceback.format_exc()))
440 raise
466 raise
441 finally:
467 finally:
442 fin.close()
468 fin.close()
443 try:
469 try:
444 fout.close() # implicit flush() may cause another EPIPE
470 fout.close() # implicit flush() may cause another EPIPE
445 except IOError as inst:
471 except IOError as inst:
446 if inst.errno != errno.EPIPE:
472 if inst.errno != errno.EPIPE:
447 raise
473 raise
448
474
449 class unixservicehandler(object):
475 class unixservicehandler(object):
450 """Set of pluggable operations for unix-mode services
476 """Set of pluggable operations for unix-mode services
451
477
452 Almost all methods except for createcmdserver() are called in the main
478 Almost all methods except for createcmdserver() are called in the main
453 process. You can't pass mutable resource back from createcmdserver().
479 process. You can't pass mutable resource back from createcmdserver().
454 """
480 """
455
481
456 pollinterval = None
482 pollinterval = None
457
483
458 def __init__(self, ui):
484 def __init__(self, ui):
459 self.ui = ui
485 self.ui = ui
460
486
461 def bindsocket(self, sock, address):
487 def bindsocket(self, sock, address):
462 util.bindunixsocket(sock, address)
488 util.bindunixsocket(sock, address)
463 sock.listen(socket.SOMAXCONN)
489 sock.listen(socket.SOMAXCONN)
464 self.ui.status(_('listening at %s\n') % address)
490 self.ui.status(_('listening at %s\n') % address)
465 self.ui.flush() # avoid buffering of status message
491 self.ui.flush() # avoid buffering of status message
466
492
467 def unlinksocket(self, address):
493 def unlinksocket(self, address):
468 os.unlink(address)
494 os.unlink(address)
469
495
470 def shouldexit(self):
496 def shouldexit(self):
471 """True if server should shut down; checked per pollinterval"""
497 """True if server should shut down; checked per pollinterval"""
472 return False
498 return False
473
499
474 def newconnection(self):
500 def newconnection(self):
475 """Called when main process notices new connection"""
501 """Called when main process notices new connection"""
476
502
477 def createcmdserver(self, repo, conn, fin, fout):
503 def createcmdserver(self, repo, conn, fin, fout):
478 """Create new command server instance; called in the process that
504 """Create new command server instance; called in the process that
479 serves for the current connection"""
505 serves for the current connection"""
480 return server(self.ui, repo, fin, fout)
506 return server(self.ui, repo, fin, fout)
481
507
482 class unixforkingservice(object):
508 class unixforkingservice(object):
483 """
509 """
484 Listens on unix domain socket and forks server per connection
510 Listens on unix domain socket and forks server per connection
485 """
511 """
486
512
487 def __init__(self, ui, repo, opts, handler=None):
513 def __init__(self, ui, repo, opts, handler=None):
488 self.ui = ui
514 self.ui = ui
489 self.repo = repo
515 self.repo = repo
490 self.address = opts['address']
516 self.address = opts['address']
491 if not util.safehasattr(socket, 'AF_UNIX'):
517 if not util.safehasattr(socket, 'AF_UNIX'):
492 raise error.Abort(_('unsupported platform'))
518 raise error.Abort(_('unsupported platform'))
493 if not self.address:
519 if not self.address:
494 raise error.Abort(_('no socket path specified with --address'))
520 raise error.Abort(_('no socket path specified with --address'))
495 self._servicehandler = handler or unixservicehandler(ui)
521 self._servicehandler = handler or unixservicehandler(ui)
496 self._sock = None
522 self._sock = None
497 self._oldsigchldhandler = None
523 self._oldsigchldhandler = None
498 self._workerpids = set() # updated by signal handler; do not iterate
524 self._workerpids = set() # updated by signal handler; do not iterate
499 self._socketunlinked = None
525 self._socketunlinked = None
500
526
501 def init(self):
527 def init(self):
502 self._sock = socket.socket(socket.AF_UNIX)
528 self._sock = socket.socket(socket.AF_UNIX)
503 self._servicehandler.bindsocket(self._sock, self.address)
529 self._servicehandler.bindsocket(self._sock, self.address)
504 if util.safehasattr(procutil, 'unblocksignal'):
530 if util.safehasattr(procutil, 'unblocksignal'):
505 procutil.unblocksignal(signal.SIGCHLD)
531 procutil.unblocksignal(signal.SIGCHLD)
506 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
532 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
507 self._oldsigchldhandler = o
533 self._oldsigchldhandler = o
508 self._socketunlinked = False
534 self._socketunlinked = False
509
535
510 def _unlinksocket(self):
536 def _unlinksocket(self):
511 if not self._socketunlinked:
537 if not self._socketunlinked:
512 self._servicehandler.unlinksocket(self.address)
538 self._servicehandler.unlinksocket(self.address)
513 self._socketunlinked = True
539 self._socketunlinked = True
514
540
515 def _cleanup(self):
541 def _cleanup(self):
516 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
542 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
517 self._sock.close()
543 self._sock.close()
518 self._unlinksocket()
544 self._unlinksocket()
519 # don't kill child processes as they have active clients, just wait
545 # don't kill child processes as they have active clients, just wait
520 self._reapworkers(0)
546 self._reapworkers(0)
521
547
522 def run(self):
548 def run(self):
523 try:
549 try:
524 self._mainloop()
550 self._mainloop()
525 finally:
551 finally:
526 self._cleanup()
552 self._cleanup()
527
553
528 def _mainloop(self):
554 def _mainloop(self):
529 exiting = False
555 exiting = False
530 h = self._servicehandler
556 h = self._servicehandler
531 selector = selectors.DefaultSelector()
557 selector = selectors.DefaultSelector()
532 selector.register(self._sock, selectors.EVENT_READ)
558 selector.register(self._sock, selectors.EVENT_READ)
533 while True:
559 while True:
534 if not exiting and h.shouldexit():
560 if not exiting and h.shouldexit():
535 # clients can no longer connect() to the domain socket, so
561 # clients can no longer connect() to the domain socket, so
536 # we stop queuing new requests.
562 # we stop queuing new requests.
537 # for requests that are queued (connect()-ed, but haven't been
563 # for requests that are queued (connect()-ed, but haven't been
538 # accept()-ed), handle them before exit. otherwise, clients
564 # accept()-ed), handle them before exit. otherwise, clients
539 # waiting for recv() will receive ECONNRESET.
565 # waiting for recv() will receive ECONNRESET.
540 self._unlinksocket()
566 self._unlinksocket()
541 exiting = True
567 exiting = True
542 try:
568 try:
543 ready = selector.select(timeout=h.pollinterval)
569 ready = selector.select(timeout=h.pollinterval)
544 except OSError as inst:
570 except OSError as inst:
545 # selectors2 raises ETIMEDOUT if timeout exceeded while
571 # selectors2 raises ETIMEDOUT if timeout exceeded while
546 # handling signal interrupt. That's probably wrong, but
572 # handling signal interrupt. That's probably wrong, but
547 # we can easily get around it.
573 # we can easily get around it.
548 if inst.errno != errno.ETIMEDOUT:
574 if inst.errno != errno.ETIMEDOUT:
549 raise
575 raise
550 ready = []
576 ready = []
551 if not ready:
577 if not ready:
552 # only exit if we completed all queued requests
578 # only exit if we completed all queued requests
553 if exiting:
579 if exiting:
554 break
580 break
555 continue
581 continue
556 try:
582 try:
557 conn, _addr = self._sock.accept()
583 conn, _addr = self._sock.accept()
558 except socket.error as inst:
584 except socket.error as inst:
559 if inst.args[0] == errno.EINTR:
585 if inst.args[0] == errno.EINTR:
560 continue
586 continue
561 raise
587 raise
562
588
563 pid = os.fork()
589 pid = os.fork()
564 if pid:
590 if pid:
565 try:
591 try:
566 self.ui.debug('forked worker process (pid=%d)\n' % pid)
592 self.ui.debug('forked worker process (pid=%d)\n' % pid)
567 self._workerpids.add(pid)
593 self._workerpids.add(pid)
568 h.newconnection()
594 h.newconnection()
569 finally:
595 finally:
570 conn.close() # release handle in parent process
596 conn.close() # release handle in parent process
571 else:
597 else:
572 try:
598 try:
573 selector.close()
599 selector.close()
574 self._sock.close()
600 self._sock.close()
575 self._runworker(conn)
601 self._runworker(conn)
576 conn.close()
602 conn.close()
577 os._exit(0)
603 os._exit(0)
578 except: # never return, hence no re-raises
604 except: # never return, hence no re-raises
579 try:
605 try:
580 self.ui.traceback(force=True)
606 self.ui.traceback(force=True)
581 finally:
607 finally:
582 os._exit(255)
608 os._exit(255)
583 selector.close()
609 selector.close()
584
610
585 def _sigchldhandler(self, signal, frame):
611 def _sigchldhandler(self, signal, frame):
586 self._reapworkers(os.WNOHANG)
612 self._reapworkers(os.WNOHANG)
587
613
588 def _reapworkers(self, options):
614 def _reapworkers(self, options):
589 while self._workerpids:
615 while self._workerpids:
590 try:
616 try:
591 pid, _status = os.waitpid(-1, options)
617 pid, _status = os.waitpid(-1, options)
592 except OSError as inst:
618 except OSError as inst:
593 if inst.errno == errno.EINTR:
619 if inst.errno == errno.EINTR:
594 continue
620 continue
595 if inst.errno != errno.ECHILD:
621 if inst.errno != errno.ECHILD:
596 raise
622 raise
597 # no child processes at all (reaped by other waitpid()?)
623 # no child processes at all (reaped by other waitpid()?)
598 self._workerpids.clear()
624 self._workerpids.clear()
599 return
625 return
600 if pid == 0:
626 if pid == 0:
601 # no waitable child processes
627 # no waitable child processes
602 return
628 return
603 self.ui.debug('worker process exited (pid=%d)\n' % pid)
629 self.ui.debug('worker process exited (pid=%d)\n' % pid)
604 self._workerpids.discard(pid)
630 self._workerpids.discard(pid)
605
631
606 def _runworker(self, conn):
632 def _runworker(self, conn):
607 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
633 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
608 _initworkerprocess()
634 _initworkerprocess()
609 h = self._servicehandler
635 h = self._servicehandler
610 try:
636 try:
611 _serverequest(self.ui, self.repo, conn, h.createcmdserver)
637 _serverequest(self.ui, self.repo, conn, h.createcmdserver)
612 finally:
638 finally:
613 gc.collect() # trigger __del__ since worker process uses os._exit
639 gc.collect() # trigger __del__ since worker process uses os._exit
@@ -1,212 +1,212 b''
1 # server.py - utility and factory of server
1 # server.py - utility and factory of server
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import os
10 import os
11
11
12 from .i18n import _
12 from .i18n import _
13
13
14 from . import (
14 from . import (
15 chgserver,
15 chgserver,
16 cmdutil,
16 cmdutil,
17 commandserver,
17 commandserver,
18 error,
18 error,
19 hgweb,
19 hgweb,
20 pycompat,
20 pycompat,
21 util,
21 util,
22 )
22 )
23
23
24 from .utils import (
24 from .utils import (
25 procutil,
25 procutil,
26 )
26 )
27
27
28 def runservice(opts, parentfn=None, initfn=None, runfn=None, logfile=None,
28 def runservice(opts, parentfn=None, initfn=None, runfn=None, logfile=None,
29 runargs=None, appendpid=False):
29 runargs=None, appendpid=False):
30 '''Run a command as a service.'''
30 '''Run a command as a service.'''
31
31
32 postexecargs = {}
32 postexecargs = {}
33
33
34 if opts['daemon_postexec']:
34 if opts['daemon_postexec']:
35 for inst in opts['daemon_postexec']:
35 for inst in opts['daemon_postexec']:
36 if inst.startswith('unlink:'):
36 if inst.startswith('unlink:'):
37 postexecargs['unlink'] = inst[7:]
37 postexecargs['unlink'] = inst[7:]
38 elif inst.startswith('chdir:'):
38 elif inst.startswith('chdir:'):
39 postexecargs['chdir'] = inst[6:]
39 postexecargs['chdir'] = inst[6:]
40 elif inst != 'none':
40 elif inst != 'none':
41 raise error.Abort(_('invalid value for --daemon-postexec: %s')
41 raise error.Abort(_('invalid value for --daemon-postexec: %s')
42 % inst)
42 % inst)
43
43
44 # When daemonized on Windows, redirect stdout/stderr to the lockfile (which
44 # When daemonized on Windows, redirect stdout/stderr to the lockfile (which
45 # gets cleaned up after the child is up and running), so that the parent can
45 # gets cleaned up after the child is up and running), so that the parent can
46 # read and print the error if this child dies early. See 594dd384803c. On
46 # read and print the error if this child dies early. See 594dd384803c. On
47 # other platforms, the child can write to the parent's stdio directly, until
47 # other platforms, the child can write to the parent's stdio directly, until
48 # it is redirected prior to runfn().
48 # it is redirected prior to runfn().
49 if pycompat.iswindows and opts['daemon_postexec']:
49 if pycompat.iswindows and opts['daemon_postexec']:
50 if 'unlink' in postexecargs and os.path.exists(postexecargs['unlink']):
50 if 'unlink' in postexecargs and os.path.exists(postexecargs['unlink']):
51 procutil.stdout.flush()
51 procutil.stdout.flush()
52 procutil.stderr.flush()
52 procutil.stderr.flush()
53
53
54 fd = os.open(postexecargs['unlink'],
54 fd = os.open(postexecargs['unlink'],
55 os.O_WRONLY | os.O_APPEND | os.O_BINARY)
55 os.O_WRONLY | os.O_APPEND | os.O_BINARY)
56 try:
56 try:
57 os.dup2(fd, procutil.stdout.fileno())
57 os.dup2(fd, procutil.stdout.fileno())
58 os.dup2(fd, procutil.stderr.fileno())
58 os.dup2(fd, procutil.stderr.fileno())
59 finally:
59 finally:
60 os.close(fd)
60 os.close(fd)
61
61
62 def writepid(pid):
62 def writepid(pid):
63 if opts['pid_file']:
63 if opts['pid_file']:
64 if appendpid:
64 if appendpid:
65 mode = 'ab'
65 mode = 'ab'
66 else:
66 else:
67 mode = 'wb'
67 mode = 'wb'
68 fp = open(opts['pid_file'], mode)
68 fp = open(opts['pid_file'], mode)
69 fp.write('%d\n' % pid)
69 fp.write('%d\n' % pid)
70 fp.close()
70 fp.close()
71
71
72 if opts['daemon'] and not opts['daemon_postexec']:
72 if opts['daemon'] and not opts['daemon_postexec']:
73 # Signal child process startup with file removal
73 # Signal child process startup with file removal
74 lockfd, lockpath = pycompat.mkstemp(prefix='hg-service-')
74 lockfd, lockpath = pycompat.mkstemp(prefix='hg-service-')
75 os.close(lockfd)
75 os.close(lockfd)
76 try:
76 try:
77 if not runargs:
77 if not runargs:
78 runargs = procutil.hgcmd() + pycompat.sysargv[1:]
78 runargs = procutil.hgcmd() + pycompat.sysargv[1:]
79 runargs.append('--daemon-postexec=unlink:%s' % lockpath)
79 runargs.append('--daemon-postexec=unlink:%s' % lockpath)
80 # Don't pass --cwd to the child process, because we've already
80 # Don't pass --cwd to the child process, because we've already
81 # changed directory.
81 # changed directory.
82 for i in pycompat.xrange(1, len(runargs)):
82 for i in pycompat.xrange(1, len(runargs)):
83 if runargs[i].startswith('--cwd='):
83 if runargs[i].startswith('--cwd='):
84 del runargs[i]
84 del runargs[i]
85 break
85 break
86 elif runargs[i].startswith('--cwd'):
86 elif runargs[i].startswith('--cwd'):
87 del runargs[i:i + 2]
87 del runargs[i:i + 2]
88 break
88 break
89 def condfn():
89 def condfn():
90 return not os.path.exists(lockpath)
90 return not os.path.exists(lockpath)
91 pid = procutil.rundetached(runargs, condfn)
91 pid = procutil.rundetached(runargs, condfn)
92 if pid < 0:
92 if pid < 0:
93 # If the daemonized process managed to write out an error msg,
93 # If the daemonized process managed to write out an error msg,
94 # report it.
94 # report it.
95 if pycompat.iswindows and os.path.exists(lockpath):
95 if pycompat.iswindows and os.path.exists(lockpath):
96 with open(lockpath, 'rb') as log:
96 with open(lockpath, 'rb') as log:
97 for line in log:
97 for line in log:
98 procutil.stderr.write(line)
98 procutil.stderr.write(line)
99 raise error.Abort(_('child process failed to start'))
99 raise error.Abort(_('child process failed to start'))
100 writepid(pid)
100 writepid(pid)
101 finally:
101 finally:
102 util.tryunlink(lockpath)
102 util.tryunlink(lockpath)
103 if parentfn:
103 if parentfn:
104 return parentfn(pid)
104 return parentfn(pid)
105 else:
105 else:
106 return
106 return
107
107
108 if initfn:
108 if initfn:
109 initfn()
109 initfn()
110
110
111 if not opts['daemon']:
111 if not opts['daemon']:
112 writepid(procutil.getpid())
112 writepid(procutil.getpid())
113
113
114 if opts['daemon_postexec']:
114 if opts['daemon_postexec']:
115 try:
115 try:
116 os.setsid()
116 os.setsid()
117 except AttributeError:
117 except AttributeError:
118 pass
118 pass
119
119
120 if 'chdir' in postexecargs:
120 if 'chdir' in postexecargs:
121 os.chdir(postexecargs['chdir'])
121 os.chdir(postexecargs['chdir'])
122 procutil.hidewindow()
122 procutil.hidewindow()
123 procutil.stdout.flush()
123 procutil.stdout.flush()
124 procutil.stderr.flush()
124 procutil.stderr.flush()
125
125
126 nullfd = os.open(os.devnull, os.O_RDWR)
126 nullfd = os.open(os.devnull, os.O_RDWR)
127 logfilefd = nullfd
127 logfilefd = nullfd
128 if logfile:
128 if logfile:
129 logfilefd = os.open(logfile, os.O_RDWR | os.O_CREAT | os.O_APPEND,
129 logfilefd = os.open(logfile, os.O_RDWR | os.O_CREAT | os.O_APPEND,
130 0o666)
130 0o666)
131 os.dup2(nullfd, procutil.stdin.fileno())
131 os.dup2(nullfd, procutil.stdin.fileno())
132 os.dup2(logfilefd, procutil.stdout.fileno())
132 os.dup2(logfilefd, procutil.stdout.fileno())
133 os.dup2(logfilefd, procutil.stderr.fileno())
133 os.dup2(logfilefd, procutil.stderr.fileno())
134 stdio = (procutil.stdin.fileno(), procutil.stdout.fileno(),
134 stdio = (procutil.stdin.fileno(), procutil.stdout.fileno(),
135 procutil.stderr.fileno())
135 procutil.stderr.fileno())
136 if nullfd not in stdio:
136 if nullfd not in stdio:
137 os.close(nullfd)
137 os.close(nullfd)
138 if logfile and logfilefd not in stdio:
138 if logfile and logfilefd not in stdio:
139 os.close(logfilefd)
139 os.close(logfilefd)
140
140
141 # Only unlink after redirecting stdout/stderr, so Windows doesn't
141 # Only unlink after redirecting stdout/stderr, so Windows doesn't
142 # complain about a sharing violation.
142 # complain about a sharing violation.
143 if 'unlink' in postexecargs:
143 if 'unlink' in postexecargs:
144 os.unlink(postexecargs['unlink'])
144 os.unlink(postexecargs['unlink'])
145
145
146 if runfn:
146 if runfn:
147 return runfn()
147 return runfn()
148
148
149 _cmdservicemap = {
149 _cmdservicemap = {
150 'chgunix': chgserver.chgunixservice,
150 'chgunix': chgserver.chgunixservice,
151 'pipe': commandserver.pipeservice,
151 'pipe': commandserver.pipeservice,
152 'unix': commandserver.unixforkingservice,
152 'unix': commandserver.unixforkingservice,
153 }
153 }
154
154
155 def _createcmdservice(ui, repo, opts):
155 def _createcmdservice(ui, repo, opts):
156 mode = opts['cmdserver']
156 mode = opts['cmdserver']
157 try:
157 try:
158 servicefn = _cmdservicemap[mode]
158 servicefn = _cmdservicemap[mode]
159 except KeyError:
159 except KeyError:
160 raise error.Abort(_('unknown mode %s') % mode)
160 raise error.Abort(_('unknown mode %s') % mode)
161 commandserver.setuplogging(ui)
161 commandserver.setuplogging(ui, repo)
162 return servicefn(ui, repo, opts)
162 return servicefn(ui, repo, opts)
163
163
164 def _createhgwebservice(ui, repo, opts):
164 def _createhgwebservice(ui, repo, opts):
165 # this way we can check if something was given in the command-line
165 # this way we can check if something was given in the command-line
166 if opts.get('port'):
166 if opts.get('port'):
167 opts['port'] = util.getport(opts.get('port'))
167 opts['port'] = util.getport(opts.get('port'))
168
168
169 alluis = {ui}
169 alluis = {ui}
170 if repo:
170 if repo:
171 baseui = repo.baseui
171 baseui = repo.baseui
172 alluis.update([repo.baseui, repo.ui])
172 alluis.update([repo.baseui, repo.ui])
173 else:
173 else:
174 baseui = ui
174 baseui = ui
175 webconf = opts.get('web_conf') or opts.get('webdir_conf')
175 webconf = opts.get('web_conf') or opts.get('webdir_conf')
176 if webconf:
176 if webconf:
177 if opts.get('subrepos'):
177 if opts.get('subrepos'):
178 raise error.Abort(_('--web-conf cannot be used with --subrepos'))
178 raise error.Abort(_('--web-conf cannot be used with --subrepos'))
179
179
180 # load server settings (e.g. web.port) to "copied" ui, which allows
180 # load server settings (e.g. web.port) to "copied" ui, which allows
181 # hgwebdir to reload webconf cleanly
181 # hgwebdir to reload webconf cleanly
182 servui = ui.copy()
182 servui = ui.copy()
183 servui.readconfig(webconf, sections=['web'])
183 servui.readconfig(webconf, sections=['web'])
184 alluis.add(servui)
184 alluis.add(servui)
185 elif opts.get('subrepos'):
185 elif opts.get('subrepos'):
186 servui = ui
186 servui = ui
187
187
188 # If repo is None, hgweb.createapp() already raises a proper abort
188 # If repo is None, hgweb.createapp() already raises a proper abort
189 # message as long as webconf is None.
189 # message as long as webconf is None.
190 if repo:
190 if repo:
191 webconf = dict()
191 webconf = dict()
192 cmdutil.addwebdirpath(repo, "", webconf)
192 cmdutil.addwebdirpath(repo, "", webconf)
193 else:
193 else:
194 servui = ui
194 servui = ui
195
195
196 optlist = ("name templates style address port prefix ipv6"
196 optlist = ("name templates style address port prefix ipv6"
197 " accesslog errorlog certificate encoding")
197 " accesslog errorlog certificate encoding")
198 for o in optlist.split():
198 for o in optlist.split():
199 val = opts.get(o, '')
199 val = opts.get(o, '')
200 if val in (None, ''): # should check against default options instead
200 if val in (None, ''): # should check against default options instead
201 continue
201 continue
202 for u in alluis:
202 for u in alluis:
203 u.setconfig("web", o, val, 'serve')
203 u.setconfig("web", o, val, 'serve')
204
204
205 app = hgweb.createapp(baseui, repo, webconf)
205 app = hgweb.createapp(baseui, repo, webconf)
206 return hgweb.httpservice(servui, app, opts)
206 return hgweb.httpservice(servui, app, opts)
207
207
208 def createservice(ui, repo, opts):
208 def createservice(ui, repo, opts):
209 if opts["cmdserver"]:
209 if opts["cmdserver"]:
210 return _createcmdservice(ui, repo, opts)
210 return _createcmdservice(ui, repo, opts)
211 else:
211 else:
212 return _createhgwebservice(ui, repo, opts)
212 return _createhgwebservice(ui, repo, opts)
General Comments 0
You need to be logged in to leave comments. Login now