##// END OF EJS Templates
commandserver: manually create file objects from socket...
Yuya Nishihara -
r29542:6011ad3b default
parent child Browse files
Show More
@@ -1,663 +1,663
1 # chgserver.py - command server extension for cHg
1 # chgserver.py - command server extension for cHg
2 #
2 #
3 # Copyright 2011 Yuya Nishihara <yuya@tcha.org>
3 # Copyright 2011 Yuya Nishihara <yuya@tcha.org>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 """command server extension for cHg (EXPERIMENTAL)
8 """command server extension for cHg (EXPERIMENTAL)
9
9
10 'S' channel (read/write)
10 'S' channel (read/write)
11 propagate ui.system() request to client
11 propagate ui.system() request to client
12
12
13 'attachio' command
13 'attachio' command
14 attach client's stdio passed by sendmsg()
14 attach client's stdio passed by sendmsg()
15
15
16 'chdir' command
16 'chdir' command
17 change current directory
17 change current directory
18
18
19 'getpager' command
19 'getpager' command
20 checks if pager is enabled and which pager should be executed
20 checks if pager is enabled and which pager should be executed
21
21
22 'setenv' command
22 'setenv' command
23 replace os.environ completely
23 replace os.environ completely
24
24
25 'setumask' command
25 'setumask' command
26 set umask
26 set umask
27
27
28 'validate' command
28 'validate' command
29 reload the config and check if the server is up to date
29 reload the config and check if the server is up to date
30
30
31 Config
31 Config
32 ------
32 ------
33
33
34 ::
34 ::
35
35
36 [chgserver]
36 [chgserver]
37 idletimeout = 3600 # seconds, after which an idle server will exit
37 idletimeout = 3600 # seconds, after which an idle server will exit
38 skiphash = False # whether to skip config or env change checks
38 skiphash = False # whether to skip config or env change checks
39 """
39 """
40
40
41 from __future__ import absolute_import
41 from __future__ import absolute_import
42
42
43 import errno
43 import errno
44 import hashlib
44 import hashlib
45 import inspect
45 import inspect
46 import os
46 import os
47 import re
47 import re
48 import signal
48 import signal
49 import struct
49 import struct
50 import sys
50 import sys
51 import threading
51 import threading
52 import time
52 import time
53
53
54 from mercurial.i18n import _
54 from mercurial.i18n import _
55
55
56 from mercurial import (
56 from mercurial import (
57 cmdutil,
57 cmdutil,
58 commands,
58 commands,
59 commandserver,
59 commandserver,
60 dispatch,
60 dispatch,
61 error,
61 error,
62 extensions,
62 extensions,
63 osutil,
63 osutil,
64 util,
64 util,
65 )
65 )
66
66
67 socketserver = util.socketserver
67 socketserver = util.socketserver
68
68
69 # Note for extension authors: ONLY specify testedwith = 'internal' for
69 # Note for extension authors: ONLY specify testedwith = 'internal' for
70 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
70 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
71 # be specifying the version(s) of Mercurial they are tested with, or
71 # be specifying the version(s) of Mercurial they are tested with, or
72 # leave the attribute unspecified.
72 # leave the attribute unspecified.
73 testedwith = 'internal'
73 testedwith = 'internal'
74
74
75 _log = commandserver.log
75 _log = commandserver.log
76
76
77 def _hashlist(items):
77 def _hashlist(items):
78 """return sha1 hexdigest for a list"""
78 """return sha1 hexdigest for a list"""
79 return hashlib.sha1(str(items)).hexdigest()
79 return hashlib.sha1(str(items)).hexdigest()
80
80
81 # sensitive config sections affecting confighash
81 # sensitive config sections affecting confighash
82 _configsections = [
82 _configsections = [
83 'alias', # affects global state commands.table
83 'alias', # affects global state commands.table
84 'extdiff', # uisetup will register new commands
84 'extdiff', # uisetup will register new commands
85 'extensions',
85 'extensions',
86 ]
86 ]
87
87
88 # sensitive environment variables affecting confighash
88 # sensitive environment variables affecting confighash
89 _envre = re.compile(r'''\A(?:
89 _envre = re.compile(r'''\A(?:
90 CHGHG
90 CHGHG
91 |HG.*
91 |HG.*
92 |LANG(?:UAGE)?
92 |LANG(?:UAGE)?
93 |LC_.*
93 |LC_.*
94 |LD_.*
94 |LD_.*
95 |PATH
95 |PATH
96 |PYTHON.*
96 |PYTHON.*
97 |TERM(?:INFO)?
97 |TERM(?:INFO)?
98 |TZ
98 |TZ
99 )\Z''', re.X)
99 )\Z''', re.X)
100
100
101 def _confighash(ui):
101 def _confighash(ui):
102 """return a quick hash for detecting config/env changes
102 """return a quick hash for detecting config/env changes
103
103
104 confighash is the hash of sensitive config items and environment variables.
104 confighash is the hash of sensitive config items and environment variables.
105
105
106 for chgserver, it is designed that once confighash changes, the server is
106 for chgserver, it is designed that once confighash changes, the server is
107 not qualified to serve its client and should redirect the client to a new
107 not qualified to serve its client and should redirect the client to a new
108 server. different from mtimehash, confighash change will not mark the
108 server. different from mtimehash, confighash change will not mark the
109 server outdated and exit since the user can have different configs at the
109 server outdated and exit since the user can have different configs at the
110 same time.
110 same time.
111 """
111 """
112 sectionitems = []
112 sectionitems = []
113 for section in _configsections:
113 for section in _configsections:
114 sectionitems.append(ui.configitems(section))
114 sectionitems.append(ui.configitems(section))
115 sectionhash = _hashlist(sectionitems)
115 sectionhash = _hashlist(sectionitems)
116 envitems = [(k, v) for k, v in os.environ.iteritems() if _envre.match(k)]
116 envitems = [(k, v) for k, v in os.environ.iteritems() if _envre.match(k)]
117 envhash = _hashlist(sorted(envitems))
117 envhash = _hashlist(sorted(envitems))
118 return sectionhash[:6] + envhash[:6]
118 return sectionhash[:6] + envhash[:6]
119
119
120 def _getmtimepaths(ui):
120 def _getmtimepaths(ui):
121 """get a list of paths that should be checked to detect change
121 """get a list of paths that should be checked to detect change
122
122
123 The list will include:
123 The list will include:
124 - extensions (will not cover all files for complex extensions)
124 - extensions (will not cover all files for complex extensions)
125 - mercurial/__version__.py
125 - mercurial/__version__.py
126 - python binary
126 - python binary
127 """
127 """
128 modules = [m for n, m in extensions.extensions(ui)]
128 modules = [m for n, m in extensions.extensions(ui)]
129 try:
129 try:
130 from mercurial import __version__
130 from mercurial import __version__
131 modules.append(__version__)
131 modules.append(__version__)
132 except ImportError:
132 except ImportError:
133 pass
133 pass
134 files = [sys.executable]
134 files = [sys.executable]
135 for m in modules:
135 for m in modules:
136 try:
136 try:
137 files.append(inspect.getabsfile(m))
137 files.append(inspect.getabsfile(m))
138 except TypeError:
138 except TypeError:
139 pass
139 pass
140 return sorted(set(files))
140 return sorted(set(files))
141
141
142 def _mtimehash(paths):
142 def _mtimehash(paths):
143 """return a quick hash for detecting file changes
143 """return a quick hash for detecting file changes
144
144
145 mtimehash calls stat on given paths and calculate a hash based on size and
145 mtimehash calls stat on given paths and calculate a hash based on size and
146 mtime of each file. mtimehash does not read file content because reading is
146 mtime of each file. mtimehash does not read file content because reading is
147 expensive. therefore it's not 100% reliable for detecting content changes.
147 expensive. therefore it's not 100% reliable for detecting content changes.
148 it's possible to return different hashes for same file contents.
148 it's possible to return different hashes for same file contents.
149 it's also possible to return a same hash for different file contents for
149 it's also possible to return a same hash for different file contents for
150 some carefully crafted situation.
150 some carefully crafted situation.
151
151
152 for chgserver, it is designed that once mtimehash changes, the server is
152 for chgserver, it is designed that once mtimehash changes, the server is
153 considered outdated immediately and should no longer provide service.
153 considered outdated immediately and should no longer provide service.
154
154
155 mtimehash is not included in confighash because we only know the paths of
155 mtimehash is not included in confighash because we only know the paths of
156 extensions after importing them (there is imp.find_module but that faces
156 extensions after importing them (there is imp.find_module but that faces
157 race conditions). We need to calculate confighash without importing.
157 race conditions). We need to calculate confighash without importing.
158 """
158 """
159 def trystat(path):
159 def trystat(path):
160 try:
160 try:
161 st = os.stat(path)
161 st = os.stat(path)
162 return (st.st_mtime, st.st_size)
162 return (st.st_mtime, st.st_size)
163 except OSError:
163 except OSError:
164 # could be ENOENT, EPERM etc. not fatal in any case
164 # could be ENOENT, EPERM etc. not fatal in any case
165 pass
165 pass
166 return _hashlist(map(trystat, paths))[:12]
166 return _hashlist(map(trystat, paths))[:12]
167
167
168 class hashstate(object):
168 class hashstate(object):
169 """a structure storing confighash, mtimehash, paths used for mtimehash"""
169 """a structure storing confighash, mtimehash, paths used for mtimehash"""
170 def __init__(self, confighash, mtimehash, mtimepaths):
170 def __init__(self, confighash, mtimehash, mtimepaths):
171 self.confighash = confighash
171 self.confighash = confighash
172 self.mtimehash = mtimehash
172 self.mtimehash = mtimehash
173 self.mtimepaths = mtimepaths
173 self.mtimepaths = mtimepaths
174
174
175 @staticmethod
175 @staticmethod
176 def fromui(ui, mtimepaths=None):
176 def fromui(ui, mtimepaths=None):
177 if mtimepaths is None:
177 if mtimepaths is None:
178 mtimepaths = _getmtimepaths(ui)
178 mtimepaths = _getmtimepaths(ui)
179 confighash = _confighash(ui)
179 confighash = _confighash(ui)
180 mtimehash = _mtimehash(mtimepaths)
180 mtimehash = _mtimehash(mtimepaths)
181 _log('confighash = %s mtimehash = %s\n' % (confighash, mtimehash))
181 _log('confighash = %s mtimehash = %s\n' % (confighash, mtimehash))
182 return hashstate(confighash, mtimehash, mtimepaths)
182 return hashstate(confighash, mtimehash, mtimepaths)
183
183
184 # copied from hgext/pager.py:uisetup()
184 # copied from hgext/pager.py:uisetup()
185 def _setuppagercmd(ui, options, cmd):
185 def _setuppagercmd(ui, options, cmd):
186 if not ui.formatted():
186 if not ui.formatted():
187 return
187 return
188
188
189 p = ui.config("pager", "pager", os.environ.get("PAGER"))
189 p = ui.config("pager", "pager", os.environ.get("PAGER"))
190 usepager = False
190 usepager = False
191 always = util.parsebool(options['pager'])
191 always = util.parsebool(options['pager'])
192 auto = options['pager'] == 'auto'
192 auto = options['pager'] == 'auto'
193
193
194 if not p:
194 if not p:
195 pass
195 pass
196 elif always:
196 elif always:
197 usepager = True
197 usepager = True
198 elif not auto:
198 elif not auto:
199 usepager = False
199 usepager = False
200 else:
200 else:
201 attended = ['annotate', 'cat', 'diff', 'export', 'glog', 'log', 'qdiff']
201 attended = ['annotate', 'cat', 'diff', 'export', 'glog', 'log', 'qdiff']
202 attend = ui.configlist('pager', 'attend', attended)
202 attend = ui.configlist('pager', 'attend', attended)
203 ignore = ui.configlist('pager', 'ignore')
203 ignore = ui.configlist('pager', 'ignore')
204 cmds, _ = cmdutil.findcmd(cmd, commands.table)
204 cmds, _ = cmdutil.findcmd(cmd, commands.table)
205
205
206 for cmd in cmds:
206 for cmd in cmds:
207 var = 'attend-%s' % cmd
207 var = 'attend-%s' % cmd
208 if ui.config('pager', var):
208 if ui.config('pager', var):
209 usepager = ui.configbool('pager', var)
209 usepager = ui.configbool('pager', var)
210 break
210 break
211 if (cmd in attend or
211 if (cmd in attend or
212 (cmd not in ignore and not attend)):
212 (cmd not in ignore and not attend)):
213 usepager = True
213 usepager = True
214 break
214 break
215
215
216 if usepager:
216 if usepager:
217 ui.setconfig('ui', 'formatted', ui.formatted(), 'pager')
217 ui.setconfig('ui', 'formatted', ui.formatted(), 'pager')
218 ui.setconfig('ui', 'interactive', False, 'pager')
218 ui.setconfig('ui', 'interactive', False, 'pager')
219 return p
219 return p
220
220
221 def _newchgui(srcui, csystem):
221 def _newchgui(srcui, csystem):
222 class chgui(srcui.__class__):
222 class chgui(srcui.__class__):
223 def __init__(self, src=None):
223 def __init__(self, src=None):
224 super(chgui, self).__init__(src)
224 super(chgui, self).__init__(src)
225 if src:
225 if src:
226 self._csystem = getattr(src, '_csystem', csystem)
226 self._csystem = getattr(src, '_csystem', csystem)
227 else:
227 else:
228 self._csystem = csystem
228 self._csystem = csystem
229
229
230 def system(self, cmd, environ=None, cwd=None, onerr=None,
230 def system(self, cmd, environ=None, cwd=None, onerr=None,
231 errprefix=None):
231 errprefix=None):
232 # fallback to the original system method if the output needs to be
232 # fallback to the original system method if the output needs to be
233 # captured (to self._buffers), or the output stream is not stdout
233 # captured (to self._buffers), or the output stream is not stdout
234 # (e.g. stderr, cStringIO), because the chg client is not aware of
234 # (e.g. stderr, cStringIO), because the chg client is not aware of
235 # these situations and will behave differently (write to stdout).
235 # these situations and will behave differently (write to stdout).
236 if (any(s[1] for s in self._bufferstates)
236 if (any(s[1] for s in self._bufferstates)
237 or not util.safehasattr(self.fout, 'fileno')
237 or not util.safehasattr(self.fout, 'fileno')
238 or self.fout.fileno() != sys.stdout.fileno()):
238 or self.fout.fileno() != sys.stdout.fileno()):
239 return super(chgui, self).system(cmd, environ, cwd, onerr,
239 return super(chgui, self).system(cmd, environ, cwd, onerr,
240 errprefix)
240 errprefix)
241 # copied from mercurial/util.py:system()
241 # copied from mercurial/util.py:system()
242 self.flush()
242 self.flush()
243 def py2shell(val):
243 def py2shell(val):
244 if val is None or val is False:
244 if val is None or val is False:
245 return '0'
245 return '0'
246 if val is True:
246 if val is True:
247 return '1'
247 return '1'
248 return str(val)
248 return str(val)
249 env = os.environ.copy()
249 env = os.environ.copy()
250 if environ:
250 if environ:
251 env.update((k, py2shell(v)) for k, v in environ.iteritems())
251 env.update((k, py2shell(v)) for k, v in environ.iteritems())
252 env['HG'] = util.hgexecutable()
252 env['HG'] = util.hgexecutable()
253 rc = self._csystem(cmd, env, cwd)
253 rc = self._csystem(cmd, env, cwd)
254 if rc and onerr:
254 if rc and onerr:
255 errmsg = '%s %s' % (os.path.basename(cmd.split(None, 1)[0]),
255 errmsg = '%s %s' % (os.path.basename(cmd.split(None, 1)[0]),
256 util.explainexit(rc)[0])
256 util.explainexit(rc)[0])
257 if errprefix:
257 if errprefix:
258 errmsg = '%s: %s' % (errprefix, errmsg)
258 errmsg = '%s: %s' % (errprefix, errmsg)
259 raise onerr(errmsg)
259 raise onerr(errmsg)
260 return rc
260 return rc
261
261
262 return chgui(srcui)
262 return chgui(srcui)
263
263
264 def _loadnewui(srcui, args):
264 def _loadnewui(srcui, args):
265 newui = srcui.__class__()
265 newui = srcui.__class__()
266 for a in ['fin', 'fout', 'ferr', 'environ']:
266 for a in ['fin', 'fout', 'ferr', 'environ']:
267 setattr(newui, a, getattr(srcui, a))
267 setattr(newui, a, getattr(srcui, a))
268 if util.safehasattr(srcui, '_csystem'):
268 if util.safehasattr(srcui, '_csystem'):
269 newui._csystem = srcui._csystem
269 newui._csystem = srcui._csystem
270
270
271 # internal config: extensions.chgserver
271 # internal config: extensions.chgserver
272 newui.setconfig('extensions', 'chgserver',
272 newui.setconfig('extensions', 'chgserver',
273 srcui.config('extensions', 'chgserver'), '--config')
273 srcui.config('extensions', 'chgserver'), '--config')
274
274
275 # command line args
275 # command line args
276 args = args[:]
276 args = args[:]
277 dispatch._parseconfig(newui, dispatch._earlygetopt(['--config'], args))
277 dispatch._parseconfig(newui, dispatch._earlygetopt(['--config'], args))
278
278
279 # stolen from tortoisehg.util.copydynamicconfig()
279 # stolen from tortoisehg.util.copydynamicconfig()
280 for section, name, value in srcui.walkconfig():
280 for section, name, value in srcui.walkconfig():
281 source = srcui.configsource(section, name)
281 source = srcui.configsource(section, name)
282 if ':' in source or source == '--config':
282 if ':' in source or source == '--config':
283 # path:line or command line
283 # path:line or command line
284 continue
284 continue
285 if source == 'none':
285 if source == 'none':
286 # ui.configsource returns 'none' by default
286 # ui.configsource returns 'none' by default
287 source = ''
287 source = ''
288 newui.setconfig(section, name, value, source)
288 newui.setconfig(section, name, value, source)
289
289
290 # load wd and repo config, copied from dispatch.py
290 # load wd and repo config, copied from dispatch.py
291 cwds = dispatch._earlygetopt(['--cwd'], args)
291 cwds = dispatch._earlygetopt(['--cwd'], args)
292 cwd = cwds and os.path.realpath(cwds[-1]) or None
292 cwd = cwds and os.path.realpath(cwds[-1]) or None
293 rpath = dispatch._earlygetopt(["-R", "--repository", "--repo"], args)
293 rpath = dispatch._earlygetopt(["-R", "--repository", "--repo"], args)
294 path, newlui = dispatch._getlocal(newui, rpath, wd=cwd)
294 path, newlui = dispatch._getlocal(newui, rpath, wd=cwd)
295
295
296 return (newui, newlui)
296 return (newui, newlui)
297
297
298 class channeledsystem(object):
298 class channeledsystem(object):
299 """Propagate ui.system() request in the following format:
299 """Propagate ui.system() request in the following format:
300
300
301 payload length (unsigned int),
301 payload length (unsigned int),
302 cmd, '\0',
302 cmd, '\0',
303 cwd, '\0',
303 cwd, '\0',
304 envkey, '=', val, '\0',
304 envkey, '=', val, '\0',
305 ...
305 ...
306 envkey, '=', val
306 envkey, '=', val
307
307
308 and waits:
308 and waits:
309
309
310 exitcode length (unsigned int),
310 exitcode length (unsigned int),
311 exitcode (int)
311 exitcode (int)
312 """
312 """
313 def __init__(self, in_, out, channel):
313 def __init__(self, in_, out, channel):
314 self.in_ = in_
314 self.in_ = in_
315 self.out = out
315 self.out = out
316 self.channel = channel
316 self.channel = channel
317
317
318 def __call__(self, cmd, environ, cwd):
318 def __call__(self, cmd, environ, cwd):
319 args = [util.quotecommand(cmd), os.path.abspath(cwd or '.')]
319 args = [util.quotecommand(cmd), os.path.abspath(cwd or '.')]
320 args.extend('%s=%s' % (k, v) for k, v in environ.iteritems())
320 args.extend('%s=%s' % (k, v) for k, v in environ.iteritems())
321 data = '\0'.join(args)
321 data = '\0'.join(args)
322 self.out.write(struct.pack('>cI', self.channel, len(data)))
322 self.out.write(struct.pack('>cI', self.channel, len(data)))
323 self.out.write(data)
323 self.out.write(data)
324 self.out.flush()
324 self.out.flush()
325
325
326 length = self.in_.read(4)
326 length = self.in_.read(4)
327 length, = struct.unpack('>I', length)
327 length, = struct.unpack('>I', length)
328 if length != 4:
328 if length != 4:
329 raise error.Abort(_('invalid response'))
329 raise error.Abort(_('invalid response'))
330 rc, = struct.unpack('>i', self.in_.read(4))
330 rc, = struct.unpack('>i', self.in_.read(4))
331 return rc
331 return rc
332
332
333 _iochannels = [
333 _iochannels = [
334 # server.ch, ui.fp, mode
334 # server.ch, ui.fp, mode
335 ('cin', 'fin', 'rb'),
335 ('cin', 'fin', 'rb'),
336 ('cout', 'fout', 'wb'),
336 ('cout', 'fout', 'wb'),
337 ('cerr', 'ferr', 'wb'),
337 ('cerr', 'ferr', 'wb'),
338 ]
338 ]
339
339
340 class chgcmdserver(commandserver.server):
340 class chgcmdserver(commandserver.server):
341 def __init__(self, ui, repo, fin, fout, sock, hashstate, baseaddress):
341 def __init__(self, ui, repo, fin, fout, sock, hashstate, baseaddress):
342 super(chgcmdserver, self).__init__(
342 super(chgcmdserver, self).__init__(
343 _newchgui(ui, channeledsystem(fin, fout, 'S')), repo, fin, fout)
343 _newchgui(ui, channeledsystem(fin, fout, 'S')), repo, fin, fout)
344 self.clientsock = sock
344 self.clientsock = sock
345 self._oldios = [] # original (self.ch, ui.fp, fd) before "attachio"
345 self._oldios = [] # original (self.ch, ui.fp, fd) before "attachio"
346 self.hashstate = hashstate
346 self.hashstate = hashstate
347 self.baseaddress = baseaddress
347 self.baseaddress = baseaddress
348 if hashstate is not None:
348 if hashstate is not None:
349 self.capabilities = self.capabilities.copy()
349 self.capabilities = self.capabilities.copy()
350 self.capabilities['validate'] = chgcmdserver.validate
350 self.capabilities['validate'] = chgcmdserver.validate
351
351
352 def cleanup(self):
352 def cleanup(self):
353 super(chgcmdserver, self).cleanup()
353 super(chgcmdserver, self).cleanup()
354 # dispatch._runcatch() does not flush outputs if exception is not
354 # dispatch._runcatch() does not flush outputs if exception is not
355 # handled by dispatch._dispatch()
355 # handled by dispatch._dispatch()
356 self.ui.flush()
356 self.ui.flush()
357 self._restoreio()
357 self._restoreio()
358
358
359 def attachio(self):
359 def attachio(self):
360 """Attach to client's stdio passed via unix domain socket; all
360 """Attach to client's stdio passed via unix domain socket; all
361 channels except cresult will no longer be used
361 channels except cresult will no longer be used
362 """
362 """
363 # tell client to sendmsg() with 1-byte payload, which makes it
363 # tell client to sendmsg() with 1-byte payload, which makes it
364 # distinctive from "attachio\n" command consumed by client.read()
364 # distinctive from "attachio\n" command consumed by client.read()
365 self.clientsock.sendall(struct.pack('>cI', 'I', 1))
365 self.clientsock.sendall(struct.pack('>cI', 'I', 1))
366 clientfds = osutil.recvfds(self.clientsock.fileno())
366 clientfds = osutil.recvfds(self.clientsock.fileno())
367 _log('received fds: %r\n' % clientfds)
367 _log('received fds: %r\n' % clientfds)
368
368
369 ui = self.ui
369 ui = self.ui
370 ui.flush()
370 ui.flush()
371 first = self._saveio()
371 first = self._saveio()
372 for fd, (cn, fn, mode) in zip(clientfds, _iochannels):
372 for fd, (cn, fn, mode) in zip(clientfds, _iochannels):
373 assert fd > 0
373 assert fd > 0
374 fp = getattr(ui, fn)
374 fp = getattr(ui, fn)
375 os.dup2(fd, fp.fileno())
375 os.dup2(fd, fp.fileno())
376 os.close(fd)
376 os.close(fd)
377 if not first:
377 if not first:
378 continue
378 continue
379 # reset buffering mode when client is first attached. as we want
379 # reset buffering mode when client is first attached. as we want
380 # to see output immediately on pager, the mode stays unchanged
380 # to see output immediately on pager, the mode stays unchanged
381 # when client re-attached. ferr is unchanged because it should
381 # when client re-attached. ferr is unchanged because it should
382 # be unbuffered no matter if it is a tty or not.
382 # be unbuffered no matter if it is a tty or not.
383 if fn == 'ferr':
383 if fn == 'ferr':
384 newfp = fp
384 newfp = fp
385 else:
385 else:
386 # make it line buffered explicitly because the default is
386 # make it line buffered explicitly because the default is
387 # decided on first write(), where fout could be a pager.
387 # decided on first write(), where fout could be a pager.
388 if fp.isatty():
388 if fp.isatty():
389 bufsize = 1 # line buffered
389 bufsize = 1 # line buffered
390 else:
390 else:
391 bufsize = -1 # system default
391 bufsize = -1 # system default
392 newfp = os.fdopen(fp.fileno(), mode, bufsize)
392 newfp = os.fdopen(fp.fileno(), mode, bufsize)
393 setattr(ui, fn, newfp)
393 setattr(ui, fn, newfp)
394 setattr(self, cn, newfp)
394 setattr(self, cn, newfp)
395
395
396 self.cresult.write(struct.pack('>i', len(clientfds)))
396 self.cresult.write(struct.pack('>i', len(clientfds)))
397
397
398 def _saveio(self):
398 def _saveio(self):
399 if self._oldios:
399 if self._oldios:
400 return False
400 return False
401 ui = self.ui
401 ui = self.ui
402 for cn, fn, _mode in _iochannels:
402 for cn, fn, _mode in _iochannels:
403 ch = getattr(self, cn)
403 ch = getattr(self, cn)
404 fp = getattr(ui, fn)
404 fp = getattr(ui, fn)
405 fd = os.dup(fp.fileno())
405 fd = os.dup(fp.fileno())
406 self._oldios.append((ch, fp, fd))
406 self._oldios.append((ch, fp, fd))
407 return True
407 return True
408
408
409 def _restoreio(self):
409 def _restoreio(self):
410 ui = self.ui
410 ui = self.ui
411 for (ch, fp, fd), (cn, fn, _mode) in zip(self._oldios, _iochannels):
411 for (ch, fp, fd), (cn, fn, _mode) in zip(self._oldios, _iochannels):
412 newfp = getattr(ui, fn)
412 newfp = getattr(ui, fn)
413 # close newfp while it's associated with client; otherwise it
413 # close newfp while it's associated with client; otherwise it
414 # would be closed when newfp is deleted
414 # would be closed when newfp is deleted
415 if newfp is not fp:
415 if newfp is not fp:
416 newfp.close()
416 newfp.close()
417 # restore original fd: fp is open again
417 # restore original fd: fp is open again
418 os.dup2(fd, fp.fileno())
418 os.dup2(fd, fp.fileno())
419 os.close(fd)
419 os.close(fd)
420 setattr(self, cn, ch)
420 setattr(self, cn, ch)
421 setattr(ui, fn, fp)
421 setattr(ui, fn, fp)
422 del self._oldios[:]
422 del self._oldios[:]
423
423
424 def validate(self):
424 def validate(self):
425 """Reload the config and check if the server is up to date
425 """Reload the config and check if the server is up to date
426
426
427 Read a list of '\0' separated arguments.
427 Read a list of '\0' separated arguments.
428 Write a non-empty list of '\0' separated instruction strings or '\0'
428 Write a non-empty list of '\0' separated instruction strings or '\0'
429 if the list is empty.
429 if the list is empty.
430 An instruction string could be either:
430 An instruction string could be either:
431 - "unlink $path", the client should unlink the path to stop the
431 - "unlink $path", the client should unlink the path to stop the
432 outdated server.
432 outdated server.
433 - "redirect $path", the client should attempt to connect to $path
433 - "redirect $path", the client should attempt to connect to $path
434 first. If it does not work, start a new server. It implies
434 first. If it does not work, start a new server. It implies
435 "reconnect".
435 "reconnect".
436 - "exit $n", the client should exit directly with code n.
436 - "exit $n", the client should exit directly with code n.
437 This may happen if we cannot parse the config.
437 This may happen if we cannot parse the config.
438 - "reconnect", the client should close the connection and
438 - "reconnect", the client should close the connection and
439 reconnect.
439 reconnect.
440 If neither "reconnect" nor "redirect" is included in the instruction
440 If neither "reconnect" nor "redirect" is included in the instruction
441 list, the client can continue with this server after completing all
441 list, the client can continue with this server after completing all
442 the instructions.
442 the instructions.
443 """
443 """
444 args = self._readlist()
444 args = self._readlist()
445 try:
445 try:
446 self.ui, lui = _loadnewui(self.ui, args)
446 self.ui, lui = _loadnewui(self.ui, args)
447 except error.ParseError as inst:
447 except error.ParseError as inst:
448 dispatch._formatparse(self.ui.warn, inst)
448 dispatch._formatparse(self.ui.warn, inst)
449 self.ui.flush()
449 self.ui.flush()
450 self.cresult.write('exit 255')
450 self.cresult.write('exit 255')
451 return
451 return
452 newhash = hashstate.fromui(lui, self.hashstate.mtimepaths)
452 newhash = hashstate.fromui(lui, self.hashstate.mtimepaths)
453 insts = []
453 insts = []
454 if newhash.mtimehash != self.hashstate.mtimehash:
454 if newhash.mtimehash != self.hashstate.mtimehash:
455 addr = _hashaddress(self.baseaddress, self.hashstate.confighash)
455 addr = _hashaddress(self.baseaddress, self.hashstate.confighash)
456 insts.append('unlink %s' % addr)
456 insts.append('unlink %s' % addr)
457 # mtimehash is empty if one or more extensions fail to load.
457 # mtimehash is empty if one or more extensions fail to load.
458 # to be compatible with hg, still serve the client this time.
458 # to be compatible with hg, still serve the client this time.
459 if self.hashstate.mtimehash:
459 if self.hashstate.mtimehash:
460 insts.append('reconnect')
460 insts.append('reconnect')
461 if newhash.confighash != self.hashstate.confighash:
461 if newhash.confighash != self.hashstate.confighash:
462 addr = _hashaddress(self.baseaddress, newhash.confighash)
462 addr = _hashaddress(self.baseaddress, newhash.confighash)
463 insts.append('redirect %s' % addr)
463 insts.append('redirect %s' % addr)
464 _log('validate: %s\n' % insts)
464 _log('validate: %s\n' % insts)
465 self.cresult.write('\0'.join(insts) or '\0')
465 self.cresult.write('\0'.join(insts) or '\0')
466
466
467 def chdir(self):
467 def chdir(self):
468 """Change current directory
468 """Change current directory
469
469
470 Note that the behavior of --cwd option is bit different from this.
470 Note that the behavior of --cwd option is bit different from this.
471 It does not affect --config parameter.
471 It does not affect --config parameter.
472 """
472 """
473 path = self._readstr()
473 path = self._readstr()
474 if not path:
474 if not path:
475 return
475 return
476 _log('chdir to %r\n' % path)
476 _log('chdir to %r\n' % path)
477 os.chdir(path)
477 os.chdir(path)
478
478
479 def setumask(self):
479 def setumask(self):
480 """Change umask"""
480 """Change umask"""
481 mask = struct.unpack('>I', self._read(4))[0]
481 mask = struct.unpack('>I', self._read(4))[0]
482 _log('setumask %r\n' % mask)
482 _log('setumask %r\n' % mask)
483 os.umask(mask)
483 os.umask(mask)
484
484
485 def getpager(self):
485 def getpager(self):
486 """Read cmdargs and write pager command to r-channel if enabled
486 """Read cmdargs and write pager command to r-channel if enabled
487
487
488 If pager isn't enabled, this writes '\0' because channeledoutput
488 If pager isn't enabled, this writes '\0' because channeledoutput
489 does not allow to write empty data.
489 does not allow to write empty data.
490 """
490 """
491 args = self._readlist()
491 args = self._readlist()
492 try:
492 try:
493 cmd, _func, args, options, _cmdoptions = dispatch._parse(self.ui,
493 cmd, _func, args, options, _cmdoptions = dispatch._parse(self.ui,
494 args)
494 args)
495 except (error.Abort, error.AmbiguousCommand, error.CommandError,
495 except (error.Abort, error.AmbiguousCommand, error.CommandError,
496 error.UnknownCommand):
496 error.UnknownCommand):
497 cmd = None
497 cmd = None
498 options = {}
498 options = {}
499 if not cmd or 'pager' not in options:
499 if not cmd or 'pager' not in options:
500 self.cresult.write('\0')
500 self.cresult.write('\0')
501 return
501 return
502
502
503 pagercmd = _setuppagercmd(self.ui, options, cmd)
503 pagercmd = _setuppagercmd(self.ui, options, cmd)
504 if pagercmd:
504 if pagercmd:
505 # Python's SIGPIPE is SIG_IGN by default. change to SIG_DFL so
505 # Python's SIGPIPE is SIG_IGN by default. change to SIG_DFL so
506 # we can exit if the pipe to the pager is closed
506 # we can exit if the pipe to the pager is closed
507 if util.safehasattr(signal, 'SIGPIPE') and \
507 if util.safehasattr(signal, 'SIGPIPE') and \
508 signal.getsignal(signal.SIGPIPE) == signal.SIG_IGN:
508 signal.getsignal(signal.SIGPIPE) == signal.SIG_IGN:
509 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
509 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
510 self.cresult.write(pagercmd)
510 self.cresult.write(pagercmd)
511 else:
511 else:
512 self.cresult.write('\0')
512 self.cresult.write('\0')
513
513
514 def setenv(self):
514 def setenv(self):
515 """Clear and update os.environ
515 """Clear and update os.environ
516
516
517 Note that not all variables can make an effect on the running process.
517 Note that not all variables can make an effect on the running process.
518 """
518 """
519 l = self._readlist()
519 l = self._readlist()
520 try:
520 try:
521 newenv = dict(s.split('=', 1) for s in l)
521 newenv = dict(s.split('=', 1) for s in l)
522 except ValueError:
522 except ValueError:
523 raise ValueError('unexpected value in setenv request')
523 raise ValueError('unexpected value in setenv request')
524 _log('setenv: %r\n' % sorted(newenv.keys()))
524 _log('setenv: %r\n' % sorted(newenv.keys()))
525 os.environ.clear()
525 os.environ.clear()
526 os.environ.update(newenv)
526 os.environ.update(newenv)
527
527
528 capabilities = commandserver.server.capabilities.copy()
528 capabilities = commandserver.server.capabilities.copy()
529 capabilities.update({'attachio': attachio,
529 capabilities.update({'attachio': attachio,
530 'chdir': chdir,
530 'chdir': chdir,
531 'getpager': getpager,
531 'getpager': getpager,
532 'setenv': setenv,
532 'setenv': setenv,
533 'setumask': setumask})
533 'setumask': setumask})
534
534
535 class _requesthandler(commandserver._requesthandler):
535 class _requesthandler(commandserver._requesthandler):
536 def _createcmdserver(self):
536 def _createcmdserver(self, conn, fin, fout):
537 ui = self.server.ui
537 ui = self.server.ui
538 repo = self.server.repo
538 repo = self.server.repo
539 return chgcmdserver(ui, repo, self.rfile, self.wfile, self.connection,
539 return chgcmdserver(ui, repo, fin, fout, conn,
540 self.server.hashstate, self.server.baseaddress)
540 self.server.hashstate, self.server.baseaddress)
541
541
542 def _tempaddress(address):
542 def _tempaddress(address):
543 return '%s.%d.tmp' % (address, os.getpid())
543 return '%s.%d.tmp' % (address, os.getpid())
544
544
545 def _hashaddress(address, hashstr):
545 def _hashaddress(address, hashstr):
546 return '%s-%s' % (address, hashstr)
546 return '%s-%s' % (address, hashstr)
547
547
548 class AutoExitMixIn: # use old-style to comply with SocketServer design
548 class AutoExitMixIn: # use old-style to comply with SocketServer design
549 lastactive = time.time()
549 lastactive = time.time()
550 idletimeout = 3600 # default 1 hour
550 idletimeout = 3600 # default 1 hour
551
551
552 def startautoexitthread(self):
552 def startautoexitthread(self):
553 # note: the auto-exit check here is cheap enough to not use a thread,
553 # note: the auto-exit check here is cheap enough to not use a thread,
554 # be done in serve_forever. however SocketServer is hook-unfriendly,
554 # be done in serve_forever. however SocketServer is hook-unfriendly,
555 # you simply cannot hook serve_forever without copying a lot of code.
555 # you simply cannot hook serve_forever without copying a lot of code.
556 # besides, serve_forever's docstring suggests using thread.
556 # besides, serve_forever's docstring suggests using thread.
557 thread = threading.Thread(target=self._autoexitloop)
557 thread = threading.Thread(target=self._autoexitloop)
558 thread.daemon = True
558 thread.daemon = True
559 thread.start()
559 thread.start()
560
560
561 def _autoexitloop(self, interval=1):
561 def _autoexitloop(self, interval=1):
562 while True:
562 while True:
563 time.sleep(interval)
563 time.sleep(interval)
564 if not self.issocketowner():
564 if not self.issocketowner():
565 _log('%s is not owned, exiting.\n' % self.server_address)
565 _log('%s is not owned, exiting.\n' % self.server_address)
566 break
566 break
567 if time.time() - self.lastactive > self.idletimeout:
567 if time.time() - self.lastactive > self.idletimeout:
568 _log('being idle too long. exiting.\n')
568 _log('being idle too long. exiting.\n')
569 break
569 break
570 self.shutdown()
570 self.shutdown()
571
571
572 def process_request(self, request, address):
572 def process_request(self, request, address):
573 self.lastactive = time.time()
573 self.lastactive = time.time()
574 return socketserver.ForkingMixIn.process_request(
574 return socketserver.ForkingMixIn.process_request(
575 self, request, address)
575 self, request, address)
576
576
577 def server_bind(self):
577 def server_bind(self):
578 # use a unique temp address so we can stat the file and do ownership
578 # use a unique temp address so we can stat the file and do ownership
579 # check later
579 # check later
580 tempaddress = _tempaddress(self.server_address)
580 tempaddress = _tempaddress(self.server_address)
581 util.bindunixsocket(self.socket, tempaddress)
581 util.bindunixsocket(self.socket, tempaddress)
582 self._socketstat = os.stat(tempaddress)
582 self._socketstat = os.stat(tempaddress)
583 # rename will replace the old socket file if exists atomically. the
583 # rename will replace the old socket file if exists atomically. the
584 # old server will detect ownership change and exit.
584 # old server will detect ownership change and exit.
585 util.rename(tempaddress, self.server_address)
585 util.rename(tempaddress, self.server_address)
586
586
587 def issocketowner(self):
587 def issocketowner(self):
588 try:
588 try:
589 stat = os.stat(self.server_address)
589 stat = os.stat(self.server_address)
590 return (stat.st_ino == self._socketstat.st_ino and
590 return (stat.st_ino == self._socketstat.st_ino and
591 stat.st_mtime == self._socketstat.st_mtime)
591 stat.st_mtime == self._socketstat.st_mtime)
592 except OSError:
592 except OSError:
593 return False
593 return False
594
594
595 def unlinksocketfile(self):
595 def unlinksocketfile(self):
596 if not self.issocketowner():
596 if not self.issocketowner():
597 return
597 return
598 # it is possible to have a race condition here that we may
598 # it is possible to have a race condition here that we may
599 # remove another server's socket file. but that's okay
599 # remove another server's socket file. but that's okay
600 # since that server will detect and exit automatically and
600 # since that server will detect and exit automatically and
601 # the client will start a new server on demand.
601 # the client will start a new server on demand.
602 try:
602 try:
603 os.unlink(self.server_address)
603 os.unlink(self.server_address)
604 except OSError as exc:
604 except OSError as exc:
605 if exc.errno != errno.ENOENT:
605 if exc.errno != errno.ENOENT:
606 raise
606 raise
607
607
608 class chgunixservice(commandserver.unixservice):
608 class chgunixservice(commandserver.unixservice):
609 def __init__(self, ui, repo, opts):
609 def __init__(self, ui, repo, opts):
610 super(chgunixservice, self).__init__(ui, repo=None, opts=opts)
610 super(chgunixservice, self).__init__(ui, repo=None, opts=opts)
611 if repo:
611 if repo:
612 # one chgserver can serve multiple repos. drop repo infomation
612 # one chgserver can serve multiple repos. drop repo infomation
613 self.ui.setconfig('bundle', 'mainreporoot', '', 'repo')
613 self.ui.setconfig('bundle', 'mainreporoot', '', 'repo')
614
614
615 def init(self):
615 def init(self):
616 self._inithashstate()
616 self._inithashstate()
617 self._checkextensions()
617 self._checkextensions()
618 class cls(AutoExitMixIn, socketserver.ForkingMixIn,
618 class cls(AutoExitMixIn, socketserver.ForkingMixIn,
619 socketserver.UnixStreamServer):
619 socketserver.UnixStreamServer):
620 ui = self.ui
620 ui = self.ui
621 repo = self.repo
621 repo = self.repo
622 hashstate = self.hashstate
622 hashstate = self.hashstate
623 baseaddress = self.baseaddress
623 baseaddress = self.baseaddress
624 self.server = cls(self.address, _requesthandler)
624 self.server = cls(self.address, _requesthandler)
625 self.server.idletimeout = self.ui.configint(
625 self.server.idletimeout = self.ui.configint(
626 'chgserver', 'idletimeout', self.server.idletimeout)
626 'chgserver', 'idletimeout', self.server.idletimeout)
627 self.server.startautoexitthread()
627 self.server.startautoexitthread()
628 self._createsymlink()
628 self._createsymlink()
629
629
630 def _inithashstate(self):
630 def _inithashstate(self):
631 self.baseaddress = self.address
631 self.baseaddress = self.address
632 if self.ui.configbool('chgserver', 'skiphash', False):
632 if self.ui.configbool('chgserver', 'skiphash', False):
633 self.hashstate = None
633 self.hashstate = None
634 return
634 return
635 self.hashstate = hashstate.fromui(self.ui)
635 self.hashstate = hashstate.fromui(self.ui)
636 self.address = _hashaddress(self.address, self.hashstate.confighash)
636 self.address = _hashaddress(self.address, self.hashstate.confighash)
637
637
638 def _checkextensions(self):
638 def _checkextensions(self):
639 if not self.hashstate:
639 if not self.hashstate:
640 return
640 return
641 if extensions.notloaded():
641 if extensions.notloaded():
642 # one or more extensions failed to load. mtimehash becomes
642 # one or more extensions failed to load. mtimehash becomes
643 # meaningless because we do not know the paths of those extensions.
643 # meaningless because we do not know the paths of those extensions.
644 # set mtimehash to an illegal hash value to invalidate the server.
644 # set mtimehash to an illegal hash value to invalidate the server.
645 self.hashstate.mtimehash = ''
645 self.hashstate.mtimehash = ''
646
646
647 def _createsymlink(self):
647 def _createsymlink(self):
648 if self.baseaddress == self.address:
648 if self.baseaddress == self.address:
649 return
649 return
650 tempaddress = _tempaddress(self.baseaddress)
650 tempaddress = _tempaddress(self.baseaddress)
651 os.symlink(os.path.basename(self.address), tempaddress)
651 os.symlink(os.path.basename(self.address), tempaddress)
652 util.rename(tempaddress, self.baseaddress)
652 util.rename(tempaddress, self.baseaddress)
653
653
654 def _cleanup(self):
654 def _cleanup(self):
655 self.server.unlinksocketfile()
655 self.server.unlinksocketfile()
656
656
657 def uisetup(ui):
657 def uisetup(ui):
658 commandserver._servicemap['chgunix'] = chgunixservice
658 commandserver._servicemap['chgunix'] = chgunixservice
659
659
660 # CHGINTERNALMARK is temporarily set by chg client to detect if chg will
660 # CHGINTERNALMARK is temporarily set by chg client to detect if chg will
661 # start another chg. drop it to avoid possible side effects.
661 # start another chg. drop it to avoid possible side effects.
662 if 'CHGINTERNALMARK' in os.environ:
662 if 'CHGINTERNALMARK' in os.environ:
663 del os.environ['CHGINTERNALMARK']
663 del os.environ['CHGINTERNALMARK']
@@ -1,425 +1,435
1 # commandserver.py - communicate with Mercurial's API over a pipe
1 # commandserver.py - communicate with Mercurial's API over a pipe
2 #
2 #
3 # Copyright Matt Mackall <mpm@selenic.com>
3 # Copyright Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import gc
11 import gc
12 import os
12 import os
13 import random
13 import random
14 import struct
14 import struct
15 import sys
15 import sys
16 import traceback
16 import traceback
17
17
18 from .i18n import _
18 from .i18n import _
19 from . import (
19 from . import (
20 encoding,
20 encoding,
21 error,
21 error,
22 util,
22 util,
23 )
23 )
24
24
25 socketserver = util.socketserver
25 socketserver = util.socketserver
26
26
27 logfile = None
27 logfile = None
28
28
29 def log(*args):
29 def log(*args):
30 if not logfile:
30 if not logfile:
31 return
31 return
32
32
33 for a in args:
33 for a in args:
34 logfile.write(str(a))
34 logfile.write(str(a))
35
35
36 logfile.flush()
36 logfile.flush()
37
37
38 class channeledoutput(object):
38 class channeledoutput(object):
39 """
39 """
40 Write data to out in the following format:
40 Write data to out in the following format:
41
41
42 data length (unsigned int),
42 data length (unsigned int),
43 data
43 data
44 """
44 """
45 def __init__(self, out, channel):
45 def __init__(self, out, channel):
46 self.out = out
46 self.out = out
47 self.channel = channel
47 self.channel = channel
48
48
49 @property
49 @property
50 def name(self):
50 def name(self):
51 return '<%c-channel>' % self.channel
51 return '<%c-channel>' % self.channel
52
52
53 def write(self, data):
53 def write(self, data):
54 if not data:
54 if not data:
55 return
55 return
56 self.out.write(struct.pack('>cI', self.channel, len(data)))
56 self.out.write(struct.pack('>cI', self.channel, len(data)))
57 self.out.write(data)
57 self.out.write(data)
58 self.out.flush()
58 self.out.flush()
59
59
60 def __getattr__(self, attr):
60 def __getattr__(self, attr):
61 if attr in ('isatty', 'fileno', 'tell', 'seek'):
61 if attr in ('isatty', 'fileno', 'tell', 'seek'):
62 raise AttributeError(attr)
62 raise AttributeError(attr)
63 return getattr(self.out, attr)
63 return getattr(self.out, attr)
64
64
65 class channeledinput(object):
65 class channeledinput(object):
66 """
66 """
67 Read data from in_.
67 Read data from in_.
68
68
69 Requests for input are written to out in the following format:
69 Requests for input are written to out in the following format:
70 channel identifier - 'I' for plain input, 'L' line based (1 byte)
70 channel identifier - 'I' for plain input, 'L' line based (1 byte)
71 how many bytes to send at most (unsigned int),
71 how many bytes to send at most (unsigned int),
72
72
73 The client replies with:
73 The client replies with:
74 data length (unsigned int), 0 meaning EOF
74 data length (unsigned int), 0 meaning EOF
75 data
75 data
76 """
76 """
77
77
78 maxchunksize = 4 * 1024
78 maxchunksize = 4 * 1024
79
79
80 def __init__(self, in_, out, channel):
80 def __init__(self, in_, out, channel):
81 self.in_ = in_
81 self.in_ = in_
82 self.out = out
82 self.out = out
83 self.channel = channel
83 self.channel = channel
84
84
85 @property
85 @property
86 def name(self):
86 def name(self):
87 return '<%c-channel>' % self.channel
87 return '<%c-channel>' % self.channel
88
88
89 def read(self, size=-1):
89 def read(self, size=-1):
90 if size < 0:
90 if size < 0:
91 # if we need to consume all the clients input, ask for 4k chunks
91 # if we need to consume all the clients input, ask for 4k chunks
92 # so the pipe doesn't fill up risking a deadlock
92 # so the pipe doesn't fill up risking a deadlock
93 size = self.maxchunksize
93 size = self.maxchunksize
94 s = self._read(size, self.channel)
94 s = self._read(size, self.channel)
95 buf = s
95 buf = s
96 while s:
96 while s:
97 s = self._read(size, self.channel)
97 s = self._read(size, self.channel)
98 buf += s
98 buf += s
99
99
100 return buf
100 return buf
101 else:
101 else:
102 return self._read(size, self.channel)
102 return self._read(size, self.channel)
103
103
104 def _read(self, size, channel):
104 def _read(self, size, channel):
105 if not size:
105 if not size:
106 return ''
106 return ''
107 assert size > 0
107 assert size > 0
108
108
109 # tell the client we need at most size bytes
109 # tell the client we need at most size bytes
110 self.out.write(struct.pack('>cI', channel, size))
110 self.out.write(struct.pack('>cI', channel, size))
111 self.out.flush()
111 self.out.flush()
112
112
113 length = self.in_.read(4)
113 length = self.in_.read(4)
114 length = struct.unpack('>I', length)[0]
114 length = struct.unpack('>I', length)[0]
115 if not length:
115 if not length:
116 return ''
116 return ''
117 else:
117 else:
118 return self.in_.read(length)
118 return self.in_.read(length)
119
119
120 def readline(self, size=-1):
120 def readline(self, size=-1):
121 if size < 0:
121 if size < 0:
122 size = self.maxchunksize
122 size = self.maxchunksize
123 s = self._read(size, 'L')
123 s = self._read(size, 'L')
124 buf = s
124 buf = s
125 # keep asking for more until there's either no more or
125 # keep asking for more until there's either no more or
126 # we got a full line
126 # we got a full line
127 while s and s[-1] != '\n':
127 while s and s[-1] != '\n':
128 s = self._read(size, 'L')
128 s = self._read(size, 'L')
129 buf += s
129 buf += s
130
130
131 return buf
131 return buf
132 else:
132 else:
133 return self._read(size, 'L')
133 return self._read(size, 'L')
134
134
135 def __iter__(self):
135 def __iter__(self):
136 return self
136 return self
137
137
138 def next(self):
138 def next(self):
139 l = self.readline()
139 l = self.readline()
140 if not l:
140 if not l:
141 raise StopIteration
141 raise StopIteration
142 return l
142 return l
143
143
144 def __getattr__(self, attr):
144 def __getattr__(self, attr):
145 if attr in ('isatty', 'fileno', 'tell', 'seek'):
145 if attr in ('isatty', 'fileno', 'tell', 'seek'):
146 raise AttributeError(attr)
146 raise AttributeError(attr)
147 return getattr(self.in_, attr)
147 return getattr(self.in_, attr)
148
148
149 class server(object):
149 class server(object):
150 """
150 """
151 Listens for commands on fin, runs them and writes the output on a channel
151 Listens for commands on fin, runs them and writes the output on a channel
152 based stream to fout.
152 based stream to fout.
153 """
153 """
154 def __init__(self, ui, repo, fin, fout):
154 def __init__(self, ui, repo, fin, fout):
155 self.cwd = os.getcwd()
155 self.cwd = os.getcwd()
156
156
157 # developer config: cmdserver.log
157 # developer config: cmdserver.log
158 logpath = ui.config("cmdserver", "log", None)
158 logpath = ui.config("cmdserver", "log", None)
159 if logpath:
159 if logpath:
160 global logfile
160 global logfile
161 if logpath == '-':
161 if logpath == '-':
162 # write log on a special 'd' (debug) channel
162 # write log on a special 'd' (debug) channel
163 logfile = channeledoutput(fout, 'd')
163 logfile = channeledoutput(fout, 'd')
164 else:
164 else:
165 logfile = open(logpath, 'a')
165 logfile = open(logpath, 'a')
166
166
167 if repo:
167 if repo:
168 # the ui here is really the repo ui so take its baseui so we don't
168 # the ui here is really the repo ui so take its baseui so we don't
169 # end up with its local configuration
169 # end up with its local configuration
170 self.ui = repo.baseui
170 self.ui = repo.baseui
171 self.repo = repo
171 self.repo = repo
172 self.repoui = repo.ui
172 self.repoui = repo.ui
173 else:
173 else:
174 self.ui = ui
174 self.ui = ui
175 self.repo = self.repoui = None
175 self.repo = self.repoui = None
176
176
177 self.cerr = channeledoutput(fout, 'e')
177 self.cerr = channeledoutput(fout, 'e')
178 self.cout = channeledoutput(fout, 'o')
178 self.cout = channeledoutput(fout, 'o')
179 self.cin = channeledinput(fin, fout, 'I')
179 self.cin = channeledinput(fin, fout, 'I')
180 self.cresult = channeledoutput(fout, 'r')
180 self.cresult = channeledoutput(fout, 'r')
181
181
182 self.client = fin
182 self.client = fin
183
183
184 def cleanup(self):
184 def cleanup(self):
185 """release and restore resources taken during server session"""
185 """release and restore resources taken during server session"""
186 pass
186 pass
187
187
188 def _read(self, size):
188 def _read(self, size):
189 if not size:
189 if not size:
190 return ''
190 return ''
191
191
192 data = self.client.read(size)
192 data = self.client.read(size)
193
193
194 # is the other end closed?
194 # is the other end closed?
195 if not data:
195 if not data:
196 raise EOFError
196 raise EOFError
197
197
198 return data
198 return data
199
199
200 def _readstr(self):
200 def _readstr(self):
201 """read a string from the channel
201 """read a string from the channel
202
202
203 format:
203 format:
204 data length (uint32), data
204 data length (uint32), data
205 """
205 """
206 length = struct.unpack('>I', self._read(4))[0]
206 length = struct.unpack('>I', self._read(4))[0]
207 if not length:
207 if not length:
208 return ''
208 return ''
209 return self._read(length)
209 return self._read(length)
210
210
211 def _readlist(self):
211 def _readlist(self):
212 """read a list of NULL separated strings from the channel"""
212 """read a list of NULL separated strings from the channel"""
213 s = self._readstr()
213 s = self._readstr()
214 if s:
214 if s:
215 return s.split('\0')
215 return s.split('\0')
216 else:
216 else:
217 return []
217 return []
218
218
219 def runcommand(self):
219 def runcommand(self):
220 """ reads a list of \0 terminated arguments, executes
220 """ reads a list of \0 terminated arguments, executes
221 and writes the return code to the result channel """
221 and writes the return code to the result channel """
222 from . import dispatch # avoid cycle
222 from . import dispatch # avoid cycle
223
223
224 args = self._readlist()
224 args = self._readlist()
225
225
226 # copy the uis so changes (e.g. --config or --verbose) don't
226 # copy the uis so changes (e.g. --config or --verbose) don't
227 # persist between requests
227 # persist between requests
228 copiedui = self.ui.copy()
228 copiedui = self.ui.copy()
229 uis = [copiedui]
229 uis = [copiedui]
230 if self.repo:
230 if self.repo:
231 self.repo.baseui = copiedui
231 self.repo.baseui = copiedui
232 # clone ui without using ui.copy because this is protected
232 # clone ui without using ui.copy because this is protected
233 repoui = self.repoui.__class__(self.repoui)
233 repoui = self.repoui.__class__(self.repoui)
234 repoui.copy = copiedui.copy # redo copy protection
234 repoui.copy = copiedui.copy # redo copy protection
235 uis.append(repoui)
235 uis.append(repoui)
236 self.repo.ui = self.repo.dirstate._ui = repoui
236 self.repo.ui = self.repo.dirstate._ui = repoui
237 self.repo.invalidateall()
237 self.repo.invalidateall()
238
238
239 for ui in uis:
239 for ui in uis:
240 ui.resetstate()
240 ui.resetstate()
241 # any kind of interaction must use server channels, but chg may
241 # any kind of interaction must use server channels, but chg may
242 # replace channels by fully functional tty files. so nontty is
242 # replace channels by fully functional tty files. so nontty is
243 # enforced only if cin is a channel.
243 # enforced only if cin is a channel.
244 if not util.safehasattr(self.cin, 'fileno'):
244 if not util.safehasattr(self.cin, 'fileno'):
245 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
245 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
246
246
247 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
247 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
248 self.cout, self.cerr)
248 self.cout, self.cerr)
249
249
250 ret = (dispatch.dispatch(req) or 0) & 255 # might return None
250 ret = (dispatch.dispatch(req) or 0) & 255 # might return None
251
251
252 # restore old cwd
252 # restore old cwd
253 if '--cwd' in args:
253 if '--cwd' in args:
254 os.chdir(self.cwd)
254 os.chdir(self.cwd)
255
255
256 self.cresult.write(struct.pack('>i', int(ret)))
256 self.cresult.write(struct.pack('>i', int(ret)))
257
257
258 def getencoding(self):
258 def getencoding(self):
259 """ writes the current encoding to the result channel """
259 """ writes the current encoding to the result channel """
260 self.cresult.write(encoding.encoding)
260 self.cresult.write(encoding.encoding)
261
261
262 def serveone(self):
262 def serveone(self):
263 cmd = self.client.readline()[:-1]
263 cmd = self.client.readline()[:-1]
264 if cmd:
264 if cmd:
265 handler = self.capabilities.get(cmd)
265 handler = self.capabilities.get(cmd)
266 if handler:
266 if handler:
267 handler(self)
267 handler(self)
268 else:
268 else:
269 # clients are expected to check what commands are supported by
269 # clients are expected to check what commands are supported by
270 # looking at the servers capabilities
270 # looking at the servers capabilities
271 raise error.Abort(_('unknown command %s') % cmd)
271 raise error.Abort(_('unknown command %s') % cmd)
272
272
273 return cmd != ''
273 return cmd != ''
274
274
275 capabilities = {'runcommand' : runcommand,
275 capabilities = {'runcommand' : runcommand,
276 'getencoding' : getencoding}
276 'getencoding' : getencoding}
277
277
278 def serve(self):
278 def serve(self):
279 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
279 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
280 hellomsg += '\n'
280 hellomsg += '\n'
281 hellomsg += 'encoding: ' + encoding.encoding
281 hellomsg += 'encoding: ' + encoding.encoding
282 hellomsg += '\n'
282 hellomsg += '\n'
283 hellomsg += 'pid: %d' % util.getpid()
283 hellomsg += 'pid: %d' % util.getpid()
284
284
285 # write the hello msg in -one- chunk
285 # write the hello msg in -one- chunk
286 self.cout.write(hellomsg)
286 self.cout.write(hellomsg)
287
287
288 try:
288 try:
289 while self.serveone():
289 while self.serveone():
290 pass
290 pass
291 except EOFError:
291 except EOFError:
292 # we'll get here if the client disconnected while we were reading
292 # we'll get here if the client disconnected while we were reading
293 # its request
293 # its request
294 return 1
294 return 1
295
295
296 return 0
296 return 0
297
297
298 def _protectio(ui):
298 def _protectio(ui):
299 """ duplicates streams and redirect original to null if ui uses stdio """
299 """ duplicates streams and redirect original to null if ui uses stdio """
300 ui.flush()
300 ui.flush()
301 newfiles = []
301 newfiles = []
302 nullfd = os.open(os.devnull, os.O_RDWR)
302 nullfd = os.open(os.devnull, os.O_RDWR)
303 for f, sysf, mode in [(ui.fin, sys.stdin, 'rb'),
303 for f, sysf, mode in [(ui.fin, sys.stdin, 'rb'),
304 (ui.fout, sys.stdout, 'wb')]:
304 (ui.fout, sys.stdout, 'wb')]:
305 if f is sysf:
305 if f is sysf:
306 newfd = os.dup(f.fileno())
306 newfd = os.dup(f.fileno())
307 os.dup2(nullfd, f.fileno())
307 os.dup2(nullfd, f.fileno())
308 f = os.fdopen(newfd, mode)
308 f = os.fdopen(newfd, mode)
309 newfiles.append(f)
309 newfiles.append(f)
310 os.close(nullfd)
310 os.close(nullfd)
311 return tuple(newfiles)
311 return tuple(newfiles)
312
312
313 def _restoreio(ui, fin, fout):
313 def _restoreio(ui, fin, fout):
314 """ restores streams from duplicated ones """
314 """ restores streams from duplicated ones """
315 ui.flush()
315 ui.flush()
316 for f, uif in [(fin, ui.fin), (fout, ui.fout)]:
316 for f, uif in [(fin, ui.fin), (fout, ui.fout)]:
317 if f is not uif:
317 if f is not uif:
318 os.dup2(f.fileno(), uif.fileno())
318 os.dup2(f.fileno(), uif.fileno())
319 f.close()
319 f.close()
320
320
321 class pipeservice(object):
321 class pipeservice(object):
322 def __init__(self, ui, repo, opts):
322 def __init__(self, ui, repo, opts):
323 self.ui = ui
323 self.ui = ui
324 self.repo = repo
324 self.repo = repo
325
325
326 def init(self):
326 def init(self):
327 pass
327 pass
328
328
329 def run(self):
329 def run(self):
330 ui = self.ui
330 ui = self.ui
331 # redirect stdio to null device so that broken extensions or in-process
331 # redirect stdio to null device so that broken extensions or in-process
332 # hooks will never cause corruption of channel protocol.
332 # hooks will never cause corruption of channel protocol.
333 fin, fout = _protectio(ui)
333 fin, fout = _protectio(ui)
334 try:
334 try:
335 sv = server(ui, self.repo, fin, fout)
335 sv = server(ui, self.repo, fin, fout)
336 return sv.serve()
336 return sv.serve()
337 finally:
337 finally:
338 sv.cleanup()
338 sv.cleanup()
339 _restoreio(ui, fin, fout)
339 _restoreio(ui, fin, fout)
340
340
341 class _requesthandler(socketserver.StreamRequestHandler):
341 class _requesthandler(socketserver.BaseRequestHandler):
342 def handle(self):
342 def handle(self):
343 # use a different process group from the master process, making this
343 # use a different process group from the master process, making this
344 # process pass kernel "is_current_pgrp_orphaned" check so signals like
344 # process pass kernel "is_current_pgrp_orphaned" check so signals like
345 # SIGTSTP, SIGTTIN, SIGTTOU are not ignored.
345 # SIGTSTP, SIGTTIN, SIGTTOU are not ignored.
346 os.setpgid(0, 0)
346 os.setpgid(0, 0)
347 # change random state otherwise forked request handlers would have a
347 # change random state otherwise forked request handlers would have a
348 # same state inherited from parent.
348 # same state inherited from parent.
349 random.seed()
349 random.seed()
350 ui = self.server.ui
350 ui = self.server.ui
351
352 conn = self.request
353 fin = conn.makefile('rb')
354 fout = conn.makefile('wb')
351 sv = None
355 sv = None
352 try:
356 try:
353 sv = self._createcmdserver()
357 sv = self._createcmdserver(conn, fin, fout)
354 try:
358 try:
355 sv.serve()
359 sv.serve()
356 # handle exceptions that may be raised by command server. most of
360 # handle exceptions that may be raised by command server. most of
357 # known exceptions are caught by dispatch.
361 # known exceptions are caught by dispatch.
358 except error.Abort as inst:
362 except error.Abort as inst:
359 ui.warn(_('abort: %s\n') % inst)
363 ui.warn(_('abort: %s\n') % inst)
360 except IOError as inst:
364 except IOError as inst:
361 if inst.errno != errno.EPIPE:
365 if inst.errno != errno.EPIPE:
362 raise
366 raise
363 except KeyboardInterrupt:
367 except KeyboardInterrupt:
364 pass
368 pass
365 finally:
369 finally:
366 sv.cleanup()
370 sv.cleanup()
367 except: # re-raises
371 except: # re-raises
368 # also write traceback to error channel. otherwise client cannot
372 # also write traceback to error channel. otherwise client cannot
369 # see it because it is written to server's stderr by default.
373 # see it because it is written to server's stderr by default.
370 if sv:
374 if sv:
371 cerr = sv.cerr
375 cerr = sv.cerr
372 else:
376 else:
373 cerr = channeledoutput(self.wfile, 'e')
377 cerr = channeledoutput(fout, 'e')
374 traceback.print_exc(file=cerr)
378 traceback.print_exc(file=cerr)
375 raise
379 raise
376 finally:
380 finally:
381 fin.close()
382 try:
383 fout.close() # implicit flush() may cause another EPIPE
384 except IOError as inst:
385 if inst.errno != errno.EPIPE:
386 raise
377 # trigger __del__ since ForkingMixIn uses os._exit
387 # trigger __del__ since ForkingMixIn uses os._exit
378 gc.collect()
388 gc.collect()
379
389
380 def _createcmdserver(self):
390 def _createcmdserver(self, conn, fin, fout):
381 ui = self.server.ui
391 ui = self.server.ui
382 repo = self.server.repo
392 repo = self.server.repo
383 return server(ui, repo, self.rfile, self.wfile)
393 return server(ui, repo, fin, fout)
384
394
385 class unixservice(object):
395 class unixservice(object):
386 """
396 """
387 Listens on unix domain socket and forks server per connection
397 Listens on unix domain socket and forks server per connection
388 """
398 """
389 def __init__(self, ui, repo, opts):
399 def __init__(self, ui, repo, opts):
390 self.ui = ui
400 self.ui = ui
391 self.repo = repo
401 self.repo = repo
392 self.address = opts['address']
402 self.address = opts['address']
393 if not util.safehasattr(socketserver, 'UnixStreamServer'):
403 if not util.safehasattr(socketserver, 'UnixStreamServer'):
394 raise error.Abort(_('unsupported platform'))
404 raise error.Abort(_('unsupported platform'))
395 if not self.address:
405 if not self.address:
396 raise error.Abort(_('no socket path specified with --address'))
406 raise error.Abort(_('no socket path specified with --address'))
397
407
398 def init(self):
408 def init(self):
399 class cls(socketserver.ForkingMixIn, socketserver.UnixStreamServer):
409 class cls(socketserver.ForkingMixIn, socketserver.UnixStreamServer):
400 ui = self.ui
410 ui = self.ui
401 repo = self.repo
411 repo = self.repo
402 self.server = cls(self.address, _requesthandler)
412 self.server = cls(self.address, _requesthandler)
403 self.ui.status(_('listening at %s\n') % self.address)
413 self.ui.status(_('listening at %s\n') % self.address)
404 self.ui.flush() # avoid buffering of status message
414 self.ui.flush() # avoid buffering of status message
405
415
406 def _cleanup(self):
416 def _cleanup(self):
407 os.unlink(self.address)
417 os.unlink(self.address)
408
418
409 def run(self):
419 def run(self):
410 try:
420 try:
411 self.server.serve_forever()
421 self.server.serve_forever()
412 finally:
422 finally:
413 self._cleanup()
423 self._cleanup()
414
424
415 _servicemap = {
425 _servicemap = {
416 'pipe': pipeservice,
426 'pipe': pipeservice,
417 'unix': unixservice,
427 'unix': unixservice,
418 }
428 }
419
429
420 def createservice(ui, repo, opts):
430 def createservice(ui, repo, opts):
421 mode = opts['cmdserver']
431 mode = opts['cmdserver']
422 try:
432 try:
423 return _servicemap[mode](ui, repo, opts)
433 return _servicemap[mode](ui, repo, opts)
424 except KeyError:
434 except KeyError:
425 raise error.Abort(_('unknown mode %s') % mode)
435 raise error.Abort(_('unknown mode %s') % mode)
General Comments 0
You need to be logged in to leave comments. Login now