##// END OF EJS Templates
commandserver: enable logging when server process started...
Yuya Nishihara -
r40858:368ecbf7 default
parent child Browse files
Show More
@@ -1,600 +1,613 b''
1 # commandserver.py - communicate with Mercurial's API over a pipe
1 # commandserver.py - communicate with Mercurial's API over a pipe
2 #
2 #
3 # Copyright Matt Mackall <mpm@selenic.com>
3 # Copyright Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import gc
11 import gc
12 import os
12 import os
13 import random
13 import random
14 import signal
14 import signal
15 import socket
15 import socket
16 import struct
16 import struct
17 import traceback
17 import traceback
18
18
19 try:
19 try:
20 import selectors
20 import selectors
21 selectors.BaseSelector
21 selectors.BaseSelector
22 except ImportError:
22 except ImportError:
23 from .thirdparty import selectors2 as selectors
23 from .thirdparty import selectors2 as selectors
24
24
25 from .i18n import _
25 from .i18n import _
26 from . import (
26 from . import (
27 encoding,
27 encoding,
28 error,
28 error,
29 pycompat,
29 pycompat,
30 util,
30 util,
31 )
31 )
32 from .utils import (
32 from .utils import (
33 cborutil,
33 cborutil,
34 procutil,
34 procutil,
35 )
35 )
36
36
37 logfile = None
37 logfile = None
38
38
39 def log(*args):
39 def log(*args):
40 if not logfile:
40 if not logfile:
41 return
41 return
42
42
43 for a in args:
43 for a in args:
44 logfile.write(str(a))
44 logfile.write(str(a))
45
45
46 logfile.flush()
46 logfile.flush()
47
47
48 class channeledoutput(object):
48 class channeledoutput(object):
49 """
49 """
50 Write data to out in the following format:
50 Write data to out in the following format:
51
51
52 data length (unsigned int),
52 data length (unsigned int),
53 data
53 data
54 """
54 """
55 def __init__(self, out, channel):
55 def __init__(self, out, channel):
56 self.out = out
56 self.out = out
57 self.channel = channel
57 self.channel = channel
58
58
59 @property
59 @property
60 def name(self):
60 def name(self):
61 return '<%c-channel>' % self.channel
61 return '<%c-channel>' % self.channel
62
62
63 def write(self, data):
63 def write(self, data):
64 if not data:
64 if not data:
65 return
65 return
66 # single write() to guarantee the same atomicity as the underlying file
66 # single write() to guarantee the same atomicity as the underlying file
67 self.out.write(struct.pack('>cI', self.channel, len(data)) + data)
67 self.out.write(struct.pack('>cI', self.channel, len(data)) + data)
68 self.out.flush()
68 self.out.flush()
69
69
70 def __getattr__(self, attr):
70 def __getattr__(self, attr):
71 if attr in (r'isatty', r'fileno', r'tell', r'seek'):
71 if attr in (r'isatty', r'fileno', r'tell', r'seek'):
72 raise AttributeError(attr)
72 raise AttributeError(attr)
73 return getattr(self.out, attr)
73 return getattr(self.out, attr)
74
74
75 class channeledmessage(object):
75 class channeledmessage(object):
76 """
76 """
77 Write encoded message and metadata to out in the following format:
77 Write encoded message and metadata to out in the following format:
78
78
79 data length (unsigned int),
79 data length (unsigned int),
80 encoded message and metadata, as a flat key-value dict.
80 encoded message and metadata, as a flat key-value dict.
81
81
82 Each message should have 'type' attribute. Messages of unknown type
82 Each message should have 'type' attribute. Messages of unknown type
83 should be ignored.
83 should be ignored.
84 """
84 """
85
85
86 # teach ui that write() can take **opts
86 # teach ui that write() can take **opts
87 structured = True
87 structured = True
88
88
89 def __init__(self, out, channel, encodename, encodefn):
89 def __init__(self, out, channel, encodename, encodefn):
90 self._cout = channeledoutput(out, channel)
90 self._cout = channeledoutput(out, channel)
91 self.encoding = encodename
91 self.encoding = encodename
92 self._encodefn = encodefn
92 self._encodefn = encodefn
93
93
94 def write(self, data, **opts):
94 def write(self, data, **opts):
95 opts = pycompat.byteskwargs(opts)
95 opts = pycompat.byteskwargs(opts)
96 if data is not None:
96 if data is not None:
97 opts[b'data'] = data
97 opts[b'data'] = data
98 self._cout.write(self._encodefn(opts))
98 self._cout.write(self._encodefn(opts))
99
99
100 def __getattr__(self, attr):
100 def __getattr__(self, attr):
101 return getattr(self._cout, attr)
101 return getattr(self._cout, attr)
102
102
103 class channeledinput(object):
103 class channeledinput(object):
104 """
104 """
105 Read data from in_.
105 Read data from in_.
106
106
107 Requests for input are written to out in the following format:
107 Requests for input are written to out in the following format:
108 channel identifier - 'I' for plain input, 'L' line based (1 byte)
108 channel identifier - 'I' for plain input, 'L' line based (1 byte)
109 how many bytes to send at most (unsigned int),
109 how many bytes to send at most (unsigned int),
110
110
111 The client replies with:
111 The client replies with:
112 data length (unsigned int), 0 meaning EOF
112 data length (unsigned int), 0 meaning EOF
113 data
113 data
114 """
114 """
115
115
116 maxchunksize = 4 * 1024
116 maxchunksize = 4 * 1024
117
117
118 def __init__(self, in_, out, channel):
118 def __init__(self, in_, out, channel):
119 self.in_ = in_
119 self.in_ = in_
120 self.out = out
120 self.out = out
121 self.channel = channel
121 self.channel = channel
122
122
123 @property
123 @property
124 def name(self):
124 def name(self):
125 return '<%c-channel>' % self.channel
125 return '<%c-channel>' % self.channel
126
126
127 def read(self, size=-1):
127 def read(self, size=-1):
128 if size < 0:
128 if size < 0:
129 # if we need to consume all the clients input, ask for 4k chunks
129 # if we need to consume all the clients input, ask for 4k chunks
130 # so the pipe doesn't fill up risking a deadlock
130 # so the pipe doesn't fill up risking a deadlock
131 size = self.maxchunksize
131 size = self.maxchunksize
132 s = self._read(size, self.channel)
132 s = self._read(size, self.channel)
133 buf = s
133 buf = s
134 while s:
134 while s:
135 s = self._read(size, self.channel)
135 s = self._read(size, self.channel)
136 buf += s
136 buf += s
137
137
138 return buf
138 return buf
139 else:
139 else:
140 return self._read(size, self.channel)
140 return self._read(size, self.channel)
141
141
142 def _read(self, size, channel):
142 def _read(self, size, channel):
143 if not size:
143 if not size:
144 return ''
144 return ''
145 assert size > 0
145 assert size > 0
146
146
147 # tell the client we need at most size bytes
147 # tell the client we need at most size bytes
148 self.out.write(struct.pack('>cI', channel, size))
148 self.out.write(struct.pack('>cI', channel, size))
149 self.out.flush()
149 self.out.flush()
150
150
151 length = self.in_.read(4)
151 length = self.in_.read(4)
152 length = struct.unpack('>I', length)[0]
152 length = struct.unpack('>I', length)[0]
153 if not length:
153 if not length:
154 return ''
154 return ''
155 else:
155 else:
156 return self.in_.read(length)
156 return self.in_.read(length)
157
157
158 def readline(self, size=-1):
158 def readline(self, size=-1):
159 if size < 0:
159 if size < 0:
160 size = self.maxchunksize
160 size = self.maxchunksize
161 s = self._read(size, 'L')
161 s = self._read(size, 'L')
162 buf = s
162 buf = s
163 # keep asking for more until there's either no more or
163 # keep asking for more until there's either no more or
164 # we got a full line
164 # we got a full line
165 while s and s[-1] != '\n':
165 while s and s[-1] != '\n':
166 s = self._read(size, 'L')
166 s = self._read(size, 'L')
167 buf += s
167 buf += s
168
168
169 return buf
169 return buf
170 else:
170 else:
171 return self._read(size, 'L')
171 return self._read(size, 'L')
172
172
173 def __iter__(self):
173 def __iter__(self):
174 return self
174 return self
175
175
176 def next(self):
176 def next(self):
177 l = self.readline()
177 l = self.readline()
178 if not l:
178 if not l:
179 raise StopIteration
179 raise StopIteration
180 return l
180 return l
181
181
182 __next__ = next
182 __next__ = next
183
183
184 def __getattr__(self, attr):
184 def __getattr__(self, attr):
185 if attr in (r'isatty', r'fileno', r'tell', r'seek'):
185 if attr in (r'isatty', r'fileno', r'tell', r'seek'):
186 raise AttributeError(attr)
186 raise AttributeError(attr)
187 return getattr(self.in_, attr)
187 return getattr(self.in_, attr)
188
188
189 _messageencoders = {
189 _messageencoders = {
190 b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
190 b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
191 }
191 }
192
192
193 def _selectmessageencoder(ui):
193 def _selectmessageencoder(ui):
194 # experimental config: cmdserver.message-encodings
194 # experimental config: cmdserver.message-encodings
195 encnames = ui.configlist(b'cmdserver', b'message-encodings')
195 encnames = ui.configlist(b'cmdserver', b'message-encodings')
196 for n in encnames:
196 for n in encnames:
197 f = _messageencoders.get(n)
197 f = _messageencoders.get(n)
198 if f:
198 if f:
199 return n, f
199 return n, f
200 raise error.Abort(b'no supported message encodings: %s'
200 raise error.Abort(b'no supported message encodings: %s'
201 % b' '.join(encnames))
201 % b' '.join(encnames))
202
202
203 class server(object):
203 class server(object):
204 """
204 """
205 Listens for commands on fin, runs them and writes the output on a channel
205 Listens for commands on fin, runs them and writes the output on a channel
206 based stream to fout.
206 based stream to fout.
207 """
207 """
208 def __init__(self, ui, repo, fin, fout):
208 def __init__(self, ui, repo, fin, fout):
209 self.cwd = encoding.getcwd()
209 self.cwd = encoding.getcwd()
210
210
211 # developer config: cmdserver.log
211 if ui.config("cmdserver", "log") == '-':
212 logpath = ui.config("cmdserver", "log")
213 if logpath:
214 global logfile
212 global logfile
215 if logpath == '-':
213 # switch log stream to the 'd' (debug) channel
216 # write log on a special 'd' (debug) channel
214 logfile = channeledoutput(fout, 'd')
217 logfile = channeledoutput(fout, 'd')
218 else:
219 logfile = open(logpath, 'a')
220
215
221 if repo:
216 if repo:
222 # the ui here is really the repo ui so take its baseui so we don't
217 # the ui here is really the repo ui so take its baseui so we don't
223 # end up with its local configuration
218 # end up with its local configuration
224 self.ui = repo.baseui
219 self.ui = repo.baseui
225 self.repo = repo
220 self.repo = repo
226 self.repoui = repo.ui
221 self.repoui = repo.ui
227 else:
222 else:
228 self.ui = ui
223 self.ui = ui
229 self.repo = self.repoui = None
224 self.repo = self.repoui = None
230
225
231 self.cerr = channeledoutput(fout, 'e')
226 self.cerr = channeledoutput(fout, 'e')
232 self.cout = channeledoutput(fout, 'o')
227 self.cout = channeledoutput(fout, 'o')
233 self.cin = channeledinput(fin, fout, 'I')
228 self.cin = channeledinput(fin, fout, 'I')
234 self.cresult = channeledoutput(fout, 'r')
229 self.cresult = channeledoutput(fout, 'r')
235
230
236 # TODO: add this to help/config.txt when stabilized
231 # TODO: add this to help/config.txt when stabilized
237 # ``channel``
232 # ``channel``
238 # Use separate channel for structured output. (Command-server only)
233 # Use separate channel for structured output. (Command-server only)
239 self.cmsg = None
234 self.cmsg = None
240 if ui.config(b'ui', b'message-output') == b'channel':
235 if ui.config(b'ui', b'message-output') == b'channel':
241 encname, encfn = _selectmessageencoder(ui)
236 encname, encfn = _selectmessageencoder(ui)
242 self.cmsg = channeledmessage(fout, b'm', encname, encfn)
237 self.cmsg = channeledmessage(fout, b'm', encname, encfn)
243
238
244 self.client = fin
239 self.client = fin
245
240
246 def cleanup(self):
241 def cleanup(self):
247 """release and restore resources taken during server session"""
242 """release and restore resources taken during server session"""
248
243
249 def _read(self, size):
244 def _read(self, size):
250 if not size:
245 if not size:
251 return ''
246 return ''
252
247
253 data = self.client.read(size)
248 data = self.client.read(size)
254
249
255 # is the other end closed?
250 # is the other end closed?
256 if not data:
251 if not data:
257 raise EOFError
252 raise EOFError
258
253
259 return data
254 return data
260
255
261 def _readstr(self):
256 def _readstr(self):
262 """read a string from the channel
257 """read a string from the channel
263
258
264 format:
259 format:
265 data length (uint32), data
260 data length (uint32), data
266 """
261 """
267 length = struct.unpack('>I', self._read(4))[0]
262 length = struct.unpack('>I', self._read(4))[0]
268 if not length:
263 if not length:
269 return ''
264 return ''
270 return self._read(length)
265 return self._read(length)
271
266
272 def _readlist(self):
267 def _readlist(self):
273 """read a list of NULL separated strings from the channel"""
268 """read a list of NULL separated strings from the channel"""
274 s = self._readstr()
269 s = self._readstr()
275 if s:
270 if s:
276 return s.split('\0')
271 return s.split('\0')
277 else:
272 else:
278 return []
273 return []
279
274
280 def runcommand(self):
275 def runcommand(self):
281 """ reads a list of \0 terminated arguments, executes
276 """ reads a list of \0 terminated arguments, executes
282 and writes the return code to the result channel """
277 and writes the return code to the result channel """
283 from . import dispatch # avoid cycle
278 from . import dispatch # avoid cycle
284
279
285 args = self._readlist()
280 args = self._readlist()
286
281
287 # copy the uis so changes (e.g. --config or --verbose) don't
282 # copy the uis so changes (e.g. --config or --verbose) don't
288 # persist between requests
283 # persist between requests
289 copiedui = self.ui.copy()
284 copiedui = self.ui.copy()
290 uis = [copiedui]
285 uis = [copiedui]
291 if self.repo:
286 if self.repo:
292 self.repo.baseui = copiedui
287 self.repo.baseui = copiedui
293 # clone ui without using ui.copy because this is protected
288 # clone ui without using ui.copy because this is protected
294 repoui = self.repoui.__class__(self.repoui)
289 repoui = self.repoui.__class__(self.repoui)
295 repoui.copy = copiedui.copy # redo copy protection
290 repoui.copy = copiedui.copy # redo copy protection
296 uis.append(repoui)
291 uis.append(repoui)
297 self.repo.ui = self.repo.dirstate._ui = repoui
292 self.repo.ui = self.repo.dirstate._ui = repoui
298 self.repo.invalidateall()
293 self.repo.invalidateall()
299
294
300 for ui in uis:
295 for ui in uis:
301 ui.resetstate()
296 ui.resetstate()
302 # any kind of interaction must use server channels, but chg may
297 # any kind of interaction must use server channels, but chg may
303 # replace channels by fully functional tty files. so nontty is
298 # replace channels by fully functional tty files. so nontty is
304 # enforced only if cin is a channel.
299 # enforced only if cin is a channel.
305 if not util.safehasattr(self.cin, 'fileno'):
300 if not util.safehasattr(self.cin, 'fileno'):
306 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
301 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
307
302
308 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
303 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
309 self.cout, self.cerr, self.cmsg)
304 self.cout, self.cerr, self.cmsg)
310
305
311 try:
306 try:
312 ret = dispatch.dispatch(req) & 255
307 ret = dispatch.dispatch(req) & 255
313 self.cresult.write(struct.pack('>i', int(ret)))
308 self.cresult.write(struct.pack('>i', int(ret)))
314 finally:
309 finally:
315 # restore old cwd
310 # restore old cwd
316 if '--cwd' in args:
311 if '--cwd' in args:
317 os.chdir(self.cwd)
312 os.chdir(self.cwd)
318
313
319 def getencoding(self):
314 def getencoding(self):
320 """ writes the current encoding to the result channel """
315 """ writes the current encoding to the result channel """
321 self.cresult.write(encoding.encoding)
316 self.cresult.write(encoding.encoding)
322
317
323 def serveone(self):
318 def serveone(self):
324 cmd = self.client.readline()[:-1]
319 cmd = self.client.readline()[:-1]
325 if cmd:
320 if cmd:
326 handler = self.capabilities.get(cmd)
321 handler = self.capabilities.get(cmd)
327 if handler:
322 if handler:
328 handler(self)
323 handler(self)
329 else:
324 else:
330 # clients are expected to check what commands are supported by
325 # clients are expected to check what commands are supported by
331 # looking at the servers capabilities
326 # looking at the servers capabilities
332 raise error.Abort(_('unknown command %s') % cmd)
327 raise error.Abort(_('unknown command %s') % cmd)
333
328
334 return cmd != ''
329 return cmd != ''
335
330
336 capabilities = {'runcommand': runcommand,
331 capabilities = {'runcommand': runcommand,
337 'getencoding': getencoding}
332 'getencoding': getencoding}
338
333
339 def serve(self):
334 def serve(self):
340 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
335 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
341 hellomsg += '\n'
336 hellomsg += '\n'
342 hellomsg += 'encoding: ' + encoding.encoding
337 hellomsg += 'encoding: ' + encoding.encoding
343 hellomsg += '\n'
338 hellomsg += '\n'
344 if self.cmsg:
339 if self.cmsg:
345 hellomsg += 'message-encoding: %s\n' % self.cmsg.encoding
340 hellomsg += 'message-encoding: %s\n' % self.cmsg.encoding
346 hellomsg += 'pid: %d' % procutil.getpid()
341 hellomsg += 'pid: %d' % procutil.getpid()
347 if util.safehasattr(os, 'getpgid'):
342 if util.safehasattr(os, 'getpgid'):
348 hellomsg += '\n'
343 hellomsg += '\n'
349 hellomsg += 'pgid: %d' % os.getpgid(0)
344 hellomsg += 'pgid: %d' % os.getpgid(0)
350
345
351 # write the hello msg in -one- chunk
346 # write the hello msg in -one- chunk
352 self.cout.write(hellomsg)
347 self.cout.write(hellomsg)
353
348
354 try:
349 try:
355 while self.serveone():
350 while self.serveone():
356 pass
351 pass
357 except EOFError:
352 except EOFError:
358 # we'll get here if the client disconnected while we were reading
353 # we'll get here if the client disconnected while we were reading
359 # its request
354 # its request
360 return 1
355 return 1
361
356
362 return 0
357 return 0
363
358
359 def setuplogging(ui):
360 """Set up server logging facility
361
362 If cmdserver.log is '-', log messages will be sent to the 'd' channel
363 while a client is connected. Otherwise, messages will be written to
364 the stderr of the server process.
365 """
366 # developer config: cmdserver.log
367 logpath = ui.config(b'cmdserver', b'log')
368 if not logpath:
369 return
370
371 global logfile
372 if logpath == b'-':
373 logfile = ui.ferr
374 else:
375 logfile = open(logpath, 'ab')
376
364 class pipeservice(object):
377 class pipeservice(object):
365 def __init__(self, ui, repo, opts):
378 def __init__(self, ui, repo, opts):
366 self.ui = ui
379 self.ui = ui
367 self.repo = repo
380 self.repo = repo
368
381
369 def init(self):
382 def init(self):
370 pass
383 pass
371
384
372 def run(self):
385 def run(self):
373 ui = self.ui
386 ui = self.ui
374 # redirect stdio to null device so that broken extensions or in-process
387 # redirect stdio to null device so that broken extensions or in-process
375 # hooks will never cause corruption of channel protocol.
388 # hooks will never cause corruption of channel protocol.
376 with procutil.protectedstdio(ui.fin, ui.fout) as (fin, fout):
389 with procutil.protectedstdio(ui.fin, ui.fout) as (fin, fout):
377 sv = server(ui, self.repo, fin, fout)
390 sv = server(ui, self.repo, fin, fout)
378 try:
391 try:
379 return sv.serve()
392 return sv.serve()
380 finally:
393 finally:
381 sv.cleanup()
394 sv.cleanup()
382
395
383 def _initworkerprocess():
396 def _initworkerprocess():
384 # use a different process group from the master process, in order to:
397 # use a different process group from the master process, in order to:
385 # 1. make the current process group no longer "orphaned" (because the
398 # 1. make the current process group no longer "orphaned" (because the
386 # parent of this process is in a different process group while
399 # parent of this process is in a different process group while
387 # remains in a same session)
400 # remains in a same session)
388 # according to POSIX 2.2.2.52, orphaned process group will ignore
401 # according to POSIX 2.2.2.52, orphaned process group will ignore
389 # terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
402 # terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
390 # cause trouble for things like ncurses.
403 # cause trouble for things like ncurses.
391 # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
404 # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
392 # SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
405 # SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
393 # processes like ssh will be killed properly, without affecting
406 # processes like ssh will be killed properly, without affecting
394 # unrelated processes.
407 # unrelated processes.
395 os.setpgid(0, 0)
408 os.setpgid(0, 0)
396 # change random state otherwise forked request handlers would have a
409 # change random state otherwise forked request handlers would have a
397 # same state inherited from parent.
410 # same state inherited from parent.
398 random.seed()
411 random.seed()
399
412
400 def _serverequest(ui, repo, conn, createcmdserver):
413 def _serverequest(ui, repo, conn, createcmdserver):
401 fin = conn.makefile(r'rb')
414 fin = conn.makefile(r'rb')
402 fout = conn.makefile(r'wb')
415 fout = conn.makefile(r'wb')
403 sv = None
416 sv = None
404 try:
417 try:
405 sv = createcmdserver(repo, conn, fin, fout)
418 sv = createcmdserver(repo, conn, fin, fout)
406 try:
419 try:
407 sv.serve()
420 sv.serve()
408 # handle exceptions that may be raised by command server. most of
421 # handle exceptions that may be raised by command server. most of
409 # known exceptions are caught by dispatch.
422 # known exceptions are caught by dispatch.
410 except error.Abort as inst:
423 except error.Abort as inst:
411 ui.error(_('abort: %s\n') % inst)
424 ui.error(_('abort: %s\n') % inst)
412 except IOError as inst:
425 except IOError as inst:
413 if inst.errno != errno.EPIPE:
426 if inst.errno != errno.EPIPE:
414 raise
427 raise
415 except KeyboardInterrupt:
428 except KeyboardInterrupt:
416 pass
429 pass
417 finally:
430 finally:
418 sv.cleanup()
431 sv.cleanup()
419 except: # re-raises
432 except: # re-raises
420 # also write traceback to error channel. otherwise client cannot
433 # also write traceback to error channel. otherwise client cannot
421 # see it because it is written to server's stderr by default.
434 # see it because it is written to server's stderr by default.
422 if sv:
435 if sv:
423 cerr = sv.cerr
436 cerr = sv.cerr
424 else:
437 else:
425 cerr = channeledoutput(fout, 'e')
438 cerr = channeledoutput(fout, 'e')
426 cerr.write(encoding.strtolocal(traceback.format_exc()))
439 cerr.write(encoding.strtolocal(traceback.format_exc()))
427 raise
440 raise
428 finally:
441 finally:
429 fin.close()
442 fin.close()
430 try:
443 try:
431 fout.close() # implicit flush() may cause another EPIPE
444 fout.close() # implicit flush() may cause another EPIPE
432 except IOError as inst:
445 except IOError as inst:
433 if inst.errno != errno.EPIPE:
446 if inst.errno != errno.EPIPE:
434 raise
447 raise
435
448
436 class unixservicehandler(object):
449 class unixservicehandler(object):
437 """Set of pluggable operations for unix-mode services
450 """Set of pluggable operations for unix-mode services
438
451
439 Almost all methods except for createcmdserver() are called in the main
452 Almost all methods except for createcmdserver() are called in the main
440 process. You can't pass mutable resource back from createcmdserver().
453 process. You can't pass mutable resource back from createcmdserver().
441 """
454 """
442
455
443 pollinterval = None
456 pollinterval = None
444
457
445 def __init__(self, ui):
458 def __init__(self, ui):
446 self.ui = ui
459 self.ui = ui
447
460
448 def bindsocket(self, sock, address):
461 def bindsocket(self, sock, address):
449 util.bindunixsocket(sock, address)
462 util.bindunixsocket(sock, address)
450 sock.listen(socket.SOMAXCONN)
463 sock.listen(socket.SOMAXCONN)
451 self.ui.status(_('listening at %s\n') % address)
464 self.ui.status(_('listening at %s\n') % address)
452 self.ui.flush() # avoid buffering of status message
465 self.ui.flush() # avoid buffering of status message
453
466
454 def unlinksocket(self, address):
467 def unlinksocket(self, address):
455 os.unlink(address)
468 os.unlink(address)
456
469
457 def shouldexit(self):
470 def shouldexit(self):
458 """True if server should shut down; checked per pollinterval"""
471 """True if server should shut down; checked per pollinterval"""
459 return False
472 return False
460
473
461 def newconnection(self):
474 def newconnection(self):
462 """Called when main process notices new connection"""
475 """Called when main process notices new connection"""
463
476
464 def createcmdserver(self, repo, conn, fin, fout):
477 def createcmdserver(self, repo, conn, fin, fout):
465 """Create new command server instance; called in the process that
478 """Create new command server instance; called in the process that
466 serves for the current connection"""
479 serves for the current connection"""
467 return server(self.ui, repo, fin, fout)
480 return server(self.ui, repo, fin, fout)
468
481
469 class unixforkingservice(object):
482 class unixforkingservice(object):
470 """
483 """
471 Listens on unix domain socket and forks server per connection
484 Listens on unix domain socket and forks server per connection
472 """
485 """
473
486
474 def __init__(self, ui, repo, opts, handler=None):
487 def __init__(self, ui, repo, opts, handler=None):
475 self.ui = ui
488 self.ui = ui
476 self.repo = repo
489 self.repo = repo
477 self.address = opts['address']
490 self.address = opts['address']
478 if not util.safehasattr(socket, 'AF_UNIX'):
491 if not util.safehasattr(socket, 'AF_UNIX'):
479 raise error.Abort(_('unsupported platform'))
492 raise error.Abort(_('unsupported platform'))
480 if not self.address:
493 if not self.address:
481 raise error.Abort(_('no socket path specified with --address'))
494 raise error.Abort(_('no socket path specified with --address'))
482 self._servicehandler = handler or unixservicehandler(ui)
495 self._servicehandler = handler or unixservicehandler(ui)
483 self._sock = None
496 self._sock = None
484 self._oldsigchldhandler = None
497 self._oldsigchldhandler = None
485 self._workerpids = set() # updated by signal handler; do not iterate
498 self._workerpids = set() # updated by signal handler; do not iterate
486 self._socketunlinked = None
499 self._socketunlinked = None
487
500
488 def init(self):
501 def init(self):
489 self._sock = socket.socket(socket.AF_UNIX)
502 self._sock = socket.socket(socket.AF_UNIX)
490 self._servicehandler.bindsocket(self._sock, self.address)
503 self._servicehandler.bindsocket(self._sock, self.address)
491 if util.safehasattr(procutil, 'unblocksignal'):
504 if util.safehasattr(procutil, 'unblocksignal'):
492 procutil.unblocksignal(signal.SIGCHLD)
505 procutil.unblocksignal(signal.SIGCHLD)
493 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
506 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
494 self._oldsigchldhandler = o
507 self._oldsigchldhandler = o
495 self._socketunlinked = False
508 self._socketunlinked = False
496
509
497 def _unlinksocket(self):
510 def _unlinksocket(self):
498 if not self._socketunlinked:
511 if not self._socketunlinked:
499 self._servicehandler.unlinksocket(self.address)
512 self._servicehandler.unlinksocket(self.address)
500 self._socketunlinked = True
513 self._socketunlinked = True
501
514
502 def _cleanup(self):
515 def _cleanup(self):
503 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
516 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
504 self._sock.close()
517 self._sock.close()
505 self._unlinksocket()
518 self._unlinksocket()
506 # don't kill child processes as they have active clients, just wait
519 # don't kill child processes as they have active clients, just wait
507 self._reapworkers(0)
520 self._reapworkers(0)
508
521
509 def run(self):
522 def run(self):
510 try:
523 try:
511 self._mainloop()
524 self._mainloop()
512 finally:
525 finally:
513 self._cleanup()
526 self._cleanup()
514
527
515 def _mainloop(self):
528 def _mainloop(self):
516 exiting = False
529 exiting = False
517 h = self._servicehandler
530 h = self._servicehandler
518 selector = selectors.DefaultSelector()
531 selector = selectors.DefaultSelector()
519 selector.register(self._sock, selectors.EVENT_READ)
532 selector.register(self._sock, selectors.EVENT_READ)
520 while True:
533 while True:
521 if not exiting and h.shouldexit():
534 if not exiting and h.shouldexit():
522 # clients can no longer connect() to the domain socket, so
535 # clients can no longer connect() to the domain socket, so
523 # we stop queuing new requests.
536 # we stop queuing new requests.
524 # for requests that are queued (connect()-ed, but haven't been
537 # for requests that are queued (connect()-ed, but haven't been
525 # accept()-ed), handle them before exit. otherwise, clients
538 # accept()-ed), handle them before exit. otherwise, clients
526 # waiting for recv() will receive ECONNRESET.
539 # waiting for recv() will receive ECONNRESET.
527 self._unlinksocket()
540 self._unlinksocket()
528 exiting = True
541 exiting = True
529 try:
542 try:
530 ready = selector.select(timeout=h.pollinterval)
543 ready = selector.select(timeout=h.pollinterval)
531 except OSError as inst:
544 except OSError as inst:
532 # selectors2 raises ETIMEDOUT if timeout exceeded while
545 # selectors2 raises ETIMEDOUT if timeout exceeded while
533 # handling signal interrupt. That's probably wrong, but
546 # handling signal interrupt. That's probably wrong, but
534 # we can easily get around it.
547 # we can easily get around it.
535 if inst.errno != errno.ETIMEDOUT:
548 if inst.errno != errno.ETIMEDOUT:
536 raise
549 raise
537 ready = []
550 ready = []
538 if not ready:
551 if not ready:
539 # only exit if we completed all queued requests
552 # only exit if we completed all queued requests
540 if exiting:
553 if exiting:
541 break
554 break
542 continue
555 continue
543 try:
556 try:
544 conn, _addr = self._sock.accept()
557 conn, _addr = self._sock.accept()
545 except socket.error as inst:
558 except socket.error as inst:
546 if inst.args[0] == errno.EINTR:
559 if inst.args[0] == errno.EINTR:
547 continue
560 continue
548 raise
561 raise
549
562
550 pid = os.fork()
563 pid = os.fork()
551 if pid:
564 if pid:
552 try:
565 try:
553 self.ui.debug('forked worker process (pid=%d)\n' % pid)
566 self.ui.debug('forked worker process (pid=%d)\n' % pid)
554 self._workerpids.add(pid)
567 self._workerpids.add(pid)
555 h.newconnection()
568 h.newconnection()
556 finally:
569 finally:
557 conn.close() # release handle in parent process
570 conn.close() # release handle in parent process
558 else:
571 else:
559 try:
572 try:
560 selector.close()
573 selector.close()
561 self._sock.close()
574 self._sock.close()
562 self._runworker(conn)
575 self._runworker(conn)
563 conn.close()
576 conn.close()
564 os._exit(0)
577 os._exit(0)
565 except: # never return, hence no re-raises
578 except: # never return, hence no re-raises
566 try:
579 try:
567 self.ui.traceback(force=True)
580 self.ui.traceback(force=True)
568 finally:
581 finally:
569 os._exit(255)
582 os._exit(255)
570 selector.close()
583 selector.close()
571
584
572 def _sigchldhandler(self, signal, frame):
585 def _sigchldhandler(self, signal, frame):
573 self._reapworkers(os.WNOHANG)
586 self._reapworkers(os.WNOHANG)
574
587
575 def _reapworkers(self, options):
588 def _reapworkers(self, options):
576 while self._workerpids:
589 while self._workerpids:
577 try:
590 try:
578 pid, _status = os.waitpid(-1, options)
591 pid, _status = os.waitpid(-1, options)
579 except OSError as inst:
592 except OSError as inst:
580 if inst.errno == errno.EINTR:
593 if inst.errno == errno.EINTR:
581 continue
594 continue
582 if inst.errno != errno.ECHILD:
595 if inst.errno != errno.ECHILD:
583 raise
596 raise
584 # no child processes at all (reaped by other waitpid()?)
597 # no child processes at all (reaped by other waitpid()?)
585 self._workerpids.clear()
598 self._workerpids.clear()
586 return
599 return
587 if pid == 0:
600 if pid == 0:
588 # no waitable child processes
601 # no waitable child processes
589 return
602 return
590 self.ui.debug('worker process exited (pid=%d)\n' % pid)
603 self.ui.debug('worker process exited (pid=%d)\n' % pid)
591 self._workerpids.discard(pid)
604 self._workerpids.discard(pid)
592
605
593 def _runworker(self, conn):
606 def _runworker(self, conn):
594 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
607 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
595 _initworkerprocess()
608 _initworkerprocess()
596 h = self._servicehandler
609 h = self._servicehandler
597 try:
610 try:
598 _serverequest(self.ui, self.repo, conn, h.createcmdserver)
611 _serverequest(self.ui, self.repo, conn, h.createcmdserver)
599 finally:
612 finally:
600 gc.collect() # trigger __del__ since worker process uses os._exit
613 gc.collect() # trigger __del__ since worker process uses os._exit
@@ -1,210 +1,212 b''
1 # server.py - utility and factory of server
1 # server.py - utility and factory of server
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import os
10 import os
11
11
12 from .i18n import _
12 from .i18n import _
13
13
14 from . import (
14 from . import (
15 chgserver,
15 chgserver,
16 cmdutil,
16 cmdutil,
17 commandserver,
17 commandserver,
18 error,
18 error,
19 hgweb,
19 hgweb,
20 pycompat,
20 pycompat,
21 util,
21 util,
22 )
22 )
23
23
24 from .utils import (
24 from .utils import (
25 procutil,
25 procutil,
26 )
26 )
27
27
28 def runservice(opts, parentfn=None, initfn=None, runfn=None, logfile=None,
28 def runservice(opts, parentfn=None, initfn=None, runfn=None, logfile=None,
29 runargs=None, appendpid=False):
29 runargs=None, appendpid=False):
30 '''Run a command as a service.'''
30 '''Run a command as a service.'''
31
31
32 postexecargs = {}
32 postexecargs = {}
33
33
34 if opts['daemon_postexec']:
34 if opts['daemon_postexec']:
35 for inst in opts['daemon_postexec']:
35 for inst in opts['daemon_postexec']:
36 if inst.startswith('unlink:'):
36 if inst.startswith('unlink:'):
37 postexecargs['unlink'] = inst[7:]
37 postexecargs['unlink'] = inst[7:]
38 elif inst.startswith('chdir:'):
38 elif inst.startswith('chdir:'):
39 postexecargs['chdir'] = inst[6:]
39 postexecargs['chdir'] = inst[6:]
40 elif inst != 'none':
40 elif inst != 'none':
41 raise error.Abort(_('invalid value for --daemon-postexec: %s')
41 raise error.Abort(_('invalid value for --daemon-postexec: %s')
42 % inst)
42 % inst)
43
43
44 # When daemonized on Windows, redirect stdout/stderr to the lockfile (which
44 # When daemonized on Windows, redirect stdout/stderr to the lockfile (which
45 # gets cleaned up after the child is up and running), so that the parent can
45 # gets cleaned up after the child is up and running), so that the parent can
46 # read and print the error if this child dies early. See 594dd384803c. On
46 # read and print the error if this child dies early. See 594dd384803c. On
47 # other platforms, the child can write to the parent's stdio directly, until
47 # other platforms, the child can write to the parent's stdio directly, until
48 # it is redirected prior to runfn().
48 # it is redirected prior to runfn().
49 if pycompat.iswindows and opts['daemon_postexec']:
49 if pycompat.iswindows and opts['daemon_postexec']:
50 if 'unlink' in postexecargs and os.path.exists(postexecargs['unlink']):
50 if 'unlink' in postexecargs and os.path.exists(postexecargs['unlink']):
51 procutil.stdout.flush()
51 procutil.stdout.flush()
52 procutil.stderr.flush()
52 procutil.stderr.flush()
53
53
54 fd = os.open(postexecargs['unlink'],
54 fd = os.open(postexecargs['unlink'],
55 os.O_WRONLY | os.O_APPEND | os.O_BINARY)
55 os.O_WRONLY | os.O_APPEND | os.O_BINARY)
56 try:
56 try:
57 os.dup2(fd, procutil.stdout.fileno())
57 os.dup2(fd, procutil.stdout.fileno())
58 os.dup2(fd, procutil.stderr.fileno())
58 os.dup2(fd, procutil.stderr.fileno())
59 finally:
59 finally:
60 os.close(fd)
60 os.close(fd)
61
61
62 def writepid(pid):
62 def writepid(pid):
63 if opts['pid_file']:
63 if opts['pid_file']:
64 if appendpid:
64 if appendpid:
65 mode = 'ab'
65 mode = 'ab'
66 else:
66 else:
67 mode = 'wb'
67 mode = 'wb'
68 fp = open(opts['pid_file'], mode)
68 fp = open(opts['pid_file'], mode)
69 fp.write('%d\n' % pid)
69 fp.write('%d\n' % pid)
70 fp.close()
70 fp.close()
71
71
72 if opts['daemon'] and not opts['daemon_postexec']:
72 if opts['daemon'] and not opts['daemon_postexec']:
73 # Signal child process startup with file removal
73 # Signal child process startup with file removal
74 lockfd, lockpath = pycompat.mkstemp(prefix='hg-service-')
74 lockfd, lockpath = pycompat.mkstemp(prefix='hg-service-')
75 os.close(lockfd)
75 os.close(lockfd)
76 try:
76 try:
77 if not runargs:
77 if not runargs:
78 runargs = procutil.hgcmd() + pycompat.sysargv[1:]
78 runargs = procutil.hgcmd() + pycompat.sysargv[1:]
79 runargs.append('--daemon-postexec=unlink:%s' % lockpath)
79 runargs.append('--daemon-postexec=unlink:%s' % lockpath)
80 # Don't pass --cwd to the child process, because we've already
80 # Don't pass --cwd to the child process, because we've already
81 # changed directory.
81 # changed directory.
82 for i in pycompat.xrange(1, len(runargs)):
82 for i in pycompat.xrange(1, len(runargs)):
83 if runargs[i].startswith('--cwd='):
83 if runargs[i].startswith('--cwd='):
84 del runargs[i]
84 del runargs[i]
85 break
85 break
86 elif runargs[i].startswith('--cwd'):
86 elif runargs[i].startswith('--cwd'):
87 del runargs[i:i + 2]
87 del runargs[i:i + 2]
88 break
88 break
89 def condfn():
89 def condfn():
90 return not os.path.exists(lockpath)
90 return not os.path.exists(lockpath)
91 pid = procutil.rundetached(runargs, condfn)
91 pid = procutil.rundetached(runargs, condfn)
92 if pid < 0:
92 if pid < 0:
93 # If the daemonized process managed to write out an error msg,
93 # If the daemonized process managed to write out an error msg,
94 # report it.
94 # report it.
95 if pycompat.iswindows and os.path.exists(lockpath):
95 if pycompat.iswindows and os.path.exists(lockpath):
96 with open(lockpath, 'rb') as log:
96 with open(lockpath, 'rb') as log:
97 for line in log:
97 for line in log:
98 procutil.stderr.write(line)
98 procutil.stderr.write(line)
99 raise error.Abort(_('child process failed to start'))
99 raise error.Abort(_('child process failed to start'))
100 writepid(pid)
100 writepid(pid)
101 finally:
101 finally:
102 util.tryunlink(lockpath)
102 util.tryunlink(lockpath)
103 if parentfn:
103 if parentfn:
104 return parentfn(pid)
104 return parentfn(pid)
105 else:
105 else:
106 return
106 return
107
107
108 if initfn:
108 if initfn:
109 initfn()
109 initfn()
110
110
111 if not opts['daemon']:
111 if not opts['daemon']:
112 writepid(procutil.getpid())
112 writepid(procutil.getpid())
113
113
114 if opts['daemon_postexec']:
114 if opts['daemon_postexec']:
115 try:
115 try:
116 os.setsid()
116 os.setsid()
117 except AttributeError:
117 except AttributeError:
118 pass
118 pass
119
119
120 if 'chdir' in postexecargs:
120 if 'chdir' in postexecargs:
121 os.chdir(postexecargs['chdir'])
121 os.chdir(postexecargs['chdir'])
122 procutil.hidewindow()
122 procutil.hidewindow()
123 procutil.stdout.flush()
123 procutil.stdout.flush()
124 procutil.stderr.flush()
124 procutil.stderr.flush()
125
125
126 nullfd = os.open(os.devnull, os.O_RDWR)
126 nullfd = os.open(os.devnull, os.O_RDWR)
127 logfilefd = nullfd
127 logfilefd = nullfd
128 if logfile:
128 if logfile:
129 logfilefd = os.open(logfile, os.O_RDWR | os.O_CREAT | os.O_APPEND,
129 logfilefd = os.open(logfile, os.O_RDWR | os.O_CREAT | os.O_APPEND,
130 0o666)
130 0o666)
131 os.dup2(nullfd, procutil.stdin.fileno())
131 os.dup2(nullfd, procutil.stdin.fileno())
132 os.dup2(logfilefd, procutil.stdout.fileno())
132 os.dup2(logfilefd, procutil.stdout.fileno())
133 os.dup2(logfilefd, procutil.stderr.fileno())
133 os.dup2(logfilefd, procutil.stderr.fileno())
134 stdio = (procutil.stdin.fileno(), procutil.stdout.fileno(),
134 stdio = (procutil.stdin.fileno(), procutil.stdout.fileno(),
135 procutil.stderr.fileno())
135 procutil.stderr.fileno())
136 if nullfd not in stdio:
136 if nullfd not in stdio:
137 os.close(nullfd)
137 os.close(nullfd)
138 if logfile and logfilefd not in stdio:
138 if logfile and logfilefd not in stdio:
139 os.close(logfilefd)
139 os.close(logfilefd)
140
140
141 # Only unlink after redirecting stdout/stderr, so Windows doesn't
141 # Only unlink after redirecting stdout/stderr, so Windows doesn't
142 # complain about a sharing violation.
142 # complain about a sharing violation.
143 if 'unlink' in postexecargs:
143 if 'unlink' in postexecargs:
144 os.unlink(postexecargs['unlink'])
144 os.unlink(postexecargs['unlink'])
145
145
146 if runfn:
146 if runfn:
147 return runfn()
147 return runfn()
148
148
149 _cmdservicemap = {
149 _cmdservicemap = {
150 'chgunix': chgserver.chgunixservice,
150 'chgunix': chgserver.chgunixservice,
151 'pipe': commandserver.pipeservice,
151 'pipe': commandserver.pipeservice,
152 'unix': commandserver.unixforkingservice,
152 'unix': commandserver.unixforkingservice,
153 }
153 }
154
154
155 def _createcmdservice(ui, repo, opts):
155 def _createcmdservice(ui, repo, opts):
156 mode = opts['cmdserver']
156 mode = opts['cmdserver']
157 try:
157 try:
158 return _cmdservicemap[mode](ui, repo, opts)
158 servicefn = _cmdservicemap[mode]
159 except KeyError:
159 except KeyError:
160 raise error.Abort(_('unknown mode %s') % mode)
160 raise error.Abort(_('unknown mode %s') % mode)
161 commandserver.setuplogging(ui)
162 return servicefn(ui, repo, opts)
161
163
162 def _createhgwebservice(ui, repo, opts):
164 def _createhgwebservice(ui, repo, opts):
163 # this way we can check if something was given in the command-line
165 # this way we can check if something was given in the command-line
164 if opts.get('port'):
166 if opts.get('port'):
165 opts['port'] = util.getport(opts.get('port'))
167 opts['port'] = util.getport(opts.get('port'))
166
168
167 alluis = {ui}
169 alluis = {ui}
168 if repo:
170 if repo:
169 baseui = repo.baseui
171 baseui = repo.baseui
170 alluis.update([repo.baseui, repo.ui])
172 alluis.update([repo.baseui, repo.ui])
171 else:
173 else:
172 baseui = ui
174 baseui = ui
173 webconf = opts.get('web_conf') or opts.get('webdir_conf')
175 webconf = opts.get('web_conf') or opts.get('webdir_conf')
174 if webconf:
176 if webconf:
175 if opts.get('subrepos'):
177 if opts.get('subrepos'):
176 raise error.Abort(_('--web-conf cannot be used with --subrepos'))
178 raise error.Abort(_('--web-conf cannot be used with --subrepos'))
177
179
178 # load server settings (e.g. web.port) to "copied" ui, which allows
180 # load server settings (e.g. web.port) to "copied" ui, which allows
179 # hgwebdir to reload webconf cleanly
181 # hgwebdir to reload webconf cleanly
180 servui = ui.copy()
182 servui = ui.copy()
181 servui.readconfig(webconf, sections=['web'])
183 servui.readconfig(webconf, sections=['web'])
182 alluis.add(servui)
184 alluis.add(servui)
183 elif opts.get('subrepos'):
185 elif opts.get('subrepos'):
184 servui = ui
186 servui = ui
185
187
186 # If repo is None, hgweb.createapp() already raises a proper abort
188 # If repo is None, hgweb.createapp() already raises a proper abort
187 # message as long as webconf is None.
189 # message as long as webconf is None.
188 if repo:
190 if repo:
189 webconf = dict()
191 webconf = dict()
190 cmdutil.addwebdirpath(repo, "", webconf)
192 cmdutil.addwebdirpath(repo, "", webconf)
191 else:
193 else:
192 servui = ui
194 servui = ui
193
195
194 optlist = ("name templates style address port prefix ipv6"
196 optlist = ("name templates style address port prefix ipv6"
195 " accesslog errorlog certificate encoding")
197 " accesslog errorlog certificate encoding")
196 for o in optlist.split():
198 for o in optlist.split():
197 val = opts.get(o, '')
199 val = opts.get(o, '')
198 if val in (None, ''): # should check against default options instead
200 if val in (None, ''): # should check against default options instead
199 continue
201 continue
200 for u in alluis:
202 for u in alluis:
201 u.setconfig("web", o, val, 'serve')
203 u.setconfig("web", o, val, 'serve')
202
204
203 app = hgweb.createapp(baseui, repo, webconf)
205 app = hgweb.createapp(baseui, repo, webconf)
204 return hgweb.httpservice(servui, app, opts)
206 return hgweb.httpservice(servui, app, opts)
205
207
206 def createservice(ui, repo, opts):
208 def createservice(ui, repo, opts):
207 if opts["cmdserver"]:
209 if opts["cmdserver"]:
208 return _createcmdservice(ui, repo, opts)
210 return _createcmdservice(ui, repo, opts)
209 else:
211 else:
210 return _createhgwebservice(ui, repo, opts)
212 return _createhgwebservice(ui, repo, opts)
General Comments 0
You need to be logged in to leave comments. Login now