##// END OF EJS Templates
commandserver: add new forking server implemented without using SocketServer...
Yuya Nishihara -
r29544:024e8f82 default
parent child Browse files
Show More
@@ -1,437 +1,562
1 # commandserver.py - communicate with Mercurial's API over a pipe
1 # commandserver.py - communicate with Mercurial's API over a pipe
2 #
2 #
3 # Copyright Matt Mackall <mpm@selenic.com>
3 # Copyright Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import gc
11 import gc
12 import os
12 import os
13 import random
13 import random
14 import select
15 import signal
16 import socket
14 import struct
17 import struct
15 import sys
18 import sys
16 import traceback
19 import traceback
17
20
18 from .i18n import _
21 from .i18n import _
19 from . import (
22 from . import (
20 encoding,
23 encoding,
21 error,
24 error,
22 util,
25 util,
23 )
26 )
24
27
25 socketserver = util.socketserver
28 socketserver = util.socketserver
26
29
27 logfile = None
30 logfile = None
28
31
29 def log(*args):
32 def log(*args):
30 if not logfile:
33 if not logfile:
31 return
34 return
32
35
33 for a in args:
36 for a in args:
34 logfile.write(str(a))
37 logfile.write(str(a))
35
38
36 logfile.flush()
39 logfile.flush()
37
40
38 class channeledoutput(object):
41 class channeledoutput(object):
39 """
42 """
40 Write data to out in the following format:
43 Write data to out in the following format:
41
44
42 data length (unsigned int),
45 data length (unsigned int),
43 data
46 data
44 """
47 """
45 def __init__(self, out, channel):
48 def __init__(self, out, channel):
46 self.out = out
49 self.out = out
47 self.channel = channel
50 self.channel = channel
48
51
49 @property
52 @property
50 def name(self):
53 def name(self):
51 return '<%c-channel>' % self.channel
54 return '<%c-channel>' % self.channel
52
55
53 def write(self, data):
56 def write(self, data):
54 if not data:
57 if not data:
55 return
58 return
56 self.out.write(struct.pack('>cI', self.channel, len(data)))
59 self.out.write(struct.pack('>cI', self.channel, len(data)))
57 self.out.write(data)
60 self.out.write(data)
58 self.out.flush()
61 self.out.flush()
59
62
60 def __getattr__(self, attr):
63 def __getattr__(self, attr):
61 if attr in ('isatty', 'fileno', 'tell', 'seek'):
64 if attr in ('isatty', 'fileno', 'tell', 'seek'):
62 raise AttributeError(attr)
65 raise AttributeError(attr)
63 return getattr(self.out, attr)
66 return getattr(self.out, attr)
64
67
65 class channeledinput(object):
68 class channeledinput(object):
66 """
69 """
67 Read data from in_.
70 Read data from in_.
68
71
69 Requests for input are written to out in the following format:
72 Requests for input are written to out in the following format:
70 channel identifier - 'I' for plain input, 'L' line based (1 byte)
73 channel identifier - 'I' for plain input, 'L' line based (1 byte)
71 how many bytes to send at most (unsigned int),
74 how many bytes to send at most (unsigned int),
72
75
73 The client replies with:
76 The client replies with:
74 data length (unsigned int), 0 meaning EOF
77 data length (unsigned int), 0 meaning EOF
75 data
78 data
76 """
79 """
77
80
78 maxchunksize = 4 * 1024
81 maxchunksize = 4 * 1024
79
82
80 def __init__(self, in_, out, channel):
83 def __init__(self, in_, out, channel):
81 self.in_ = in_
84 self.in_ = in_
82 self.out = out
85 self.out = out
83 self.channel = channel
86 self.channel = channel
84
87
85 @property
88 @property
86 def name(self):
89 def name(self):
87 return '<%c-channel>' % self.channel
90 return '<%c-channel>' % self.channel
88
91
89 def read(self, size=-1):
92 def read(self, size=-1):
90 if size < 0:
93 if size < 0:
91 # if we need to consume all the clients input, ask for 4k chunks
94 # if we need to consume all the clients input, ask for 4k chunks
92 # so the pipe doesn't fill up risking a deadlock
95 # so the pipe doesn't fill up risking a deadlock
93 size = self.maxchunksize
96 size = self.maxchunksize
94 s = self._read(size, self.channel)
97 s = self._read(size, self.channel)
95 buf = s
98 buf = s
96 while s:
99 while s:
97 s = self._read(size, self.channel)
100 s = self._read(size, self.channel)
98 buf += s
101 buf += s
99
102
100 return buf
103 return buf
101 else:
104 else:
102 return self._read(size, self.channel)
105 return self._read(size, self.channel)
103
106
104 def _read(self, size, channel):
107 def _read(self, size, channel):
105 if not size:
108 if not size:
106 return ''
109 return ''
107 assert size > 0
110 assert size > 0
108
111
109 # tell the client we need at most size bytes
112 # tell the client we need at most size bytes
110 self.out.write(struct.pack('>cI', channel, size))
113 self.out.write(struct.pack('>cI', channel, size))
111 self.out.flush()
114 self.out.flush()
112
115
113 length = self.in_.read(4)
116 length = self.in_.read(4)
114 length = struct.unpack('>I', length)[0]
117 length = struct.unpack('>I', length)[0]
115 if not length:
118 if not length:
116 return ''
119 return ''
117 else:
120 else:
118 return self.in_.read(length)
121 return self.in_.read(length)
119
122
120 def readline(self, size=-1):
123 def readline(self, size=-1):
121 if size < 0:
124 if size < 0:
122 size = self.maxchunksize
125 size = self.maxchunksize
123 s = self._read(size, 'L')
126 s = self._read(size, 'L')
124 buf = s
127 buf = s
125 # keep asking for more until there's either no more or
128 # keep asking for more until there's either no more or
126 # we got a full line
129 # we got a full line
127 while s and s[-1] != '\n':
130 while s and s[-1] != '\n':
128 s = self._read(size, 'L')
131 s = self._read(size, 'L')
129 buf += s
132 buf += s
130
133
131 return buf
134 return buf
132 else:
135 else:
133 return self._read(size, 'L')
136 return self._read(size, 'L')
134
137
135 def __iter__(self):
138 def __iter__(self):
136 return self
139 return self
137
140
138 def next(self):
141 def next(self):
139 l = self.readline()
142 l = self.readline()
140 if not l:
143 if not l:
141 raise StopIteration
144 raise StopIteration
142 return l
145 return l
143
146
144 def __getattr__(self, attr):
147 def __getattr__(self, attr):
145 if attr in ('isatty', 'fileno', 'tell', 'seek'):
148 if attr in ('isatty', 'fileno', 'tell', 'seek'):
146 raise AttributeError(attr)
149 raise AttributeError(attr)
147 return getattr(self.in_, attr)
150 return getattr(self.in_, attr)
148
151
149 class server(object):
152 class server(object):
150 """
153 """
151 Listens for commands on fin, runs them and writes the output on a channel
154 Listens for commands on fin, runs them and writes the output on a channel
152 based stream to fout.
155 based stream to fout.
153 """
156 """
154 def __init__(self, ui, repo, fin, fout):
157 def __init__(self, ui, repo, fin, fout):
155 self.cwd = os.getcwd()
158 self.cwd = os.getcwd()
156
159
157 # developer config: cmdserver.log
160 # developer config: cmdserver.log
158 logpath = ui.config("cmdserver", "log", None)
161 logpath = ui.config("cmdserver", "log", None)
159 if logpath:
162 if logpath:
160 global logfile
163 global logfile
161 if logpath == '-':
164 if logpath == '-':
162 # write log on a special 'd' (debug) channel
165 # write log on a special 'd' (debug) channel
163 logfile = channeledoutput(fout, 'd')
166 logfile = channeledoutput(fout, 'd')
164 else:
167 else:
165 logfile = open(logpath, 'a')
168 logfile = open(logpath, 'a')
166
169
167 if repo:
170 if repo:
168 # the ui here is really the repo ui so take its baseui so we don't
171 # the ui here is really the repo ui so take its baseui so we don't
169 # end up with its local configuration
172 # end up with its local configuration
170 self.ui = repo.baseui
173 self.ui = repo.baseui
171 self.repo = repo
174 self.repo = repo
172 self.repoui = repo.ui
175 self.repoui = repo.ui
173 else:
176 else:
174 self.ui = ui
177 self.ui = ui
175 self.repo = self.repoui = None
178 self.repo = self.repoui = None
176
179
177 self.cerr = channeledoutput(fout, 'e')
180 self.cerr = channeledoutput(fout, 'e')
178 self.cout = channeledoutput(fout, 'o')
181 self.cout = channeledoutput(fout, 'o')
179 self.cin = channeledinput(fin, fout, 'I')
182 self.cin = channeledinput(fin, fout, 'I')
180 self.cresult = channeledoutput(fout, 'r')
183 self.cresult = channeledoutput(fout, 'r')
181
184
182 self.client = fin
185 self.client = fin
183
186
184 def cleanup(self):
187 def cleanup(self):
185 """release and restore resources taken during server session"""
188 """release and restore resources taken during server session"""
186 pass
189 pass
187
190
188 def _read(self, size):
191 def _read(self, size):
189 if not size:
192 if not size:
190 return ''
193 return ''
191
194
192 data = self.client.read(size)
195 data = self.client.read(size)
193
196
194 # is the other end closed?
197 # is the other end closed?
195 if not data:
198 if not data:
196 raise EOFError
199 raise EOFError
197
200
198 return data
201 return data
199
202
200 def _readstr(self):
203 def _readstr(self):
201 """read a string from the channel
204 """read a string from the channel
202
205
203 format:
206 format:
204 data length (uint32), data
207 data length (uint32), data
205 """
208 """
206 length = struct.unpack('>I', self._read(4))[0]
209 length = struct.unpack('>I', self._read(4))[0]
207 if not length:
210 if not length:
208 return ''
211 return ''
209 return self._read(length)
212 return self._read(length)
210
213
211 def _readlist(self):
214 def _readlist(self):
212 """read a list of NULL separated strings from the channel"""
215 """read a list of NULL separated strings from the channel"""
213 s = self._readstr()
216 s = self._readstr()
214 if s:
217 if s:
215 return s.split('\0')
218 return s.split('\0')
216 else:
219 else:
217 return []
220 return []
218
221
219 def runcommand(self):
222 def runcommand(self):
220 """ reads a list of \0 terminated arguments, executes
223 """ reads a list of \0 terminated arguments, executes
221 and writes the return code to the result channel """
224 and writes the return code to the result channel """
222 from . import dispatch # avoid cycle
225 from . import dispatch # avoid cycle
223
226
224 args = self._readlist()
227 args = self._readlist()
225
228
226 # copy the uis so changes (e.g. --config or --verbose) don't
229 # copy the uis so changes (e.g. --config or --verbose) don't
227 # persist between requests
230 # persist between requests
228 copiedui = self.ui.copy()
231 copiedui = self.ui.copy()
229 uis = [copiedui]
232 uis = [copiedui]
230 if self.repo:
233 if self.repo:
231 self.repo.baseui = copiedui
234 self.repo.baseui = copiedui
232 # clone ui without using ui.copy because this is protected
235 # clone ui without using ui.copy because this is protected
233 repoui = self.repoui.__class__(self.repoui)
236 repoui = self.repoui.__class__(self.repoui)
234 repoui.copy = copiedui.copy # redo copy protection
237 repoui.copy = copiedui.copy # redo copy protection
235 uis.append(repoui)
238 uis.append(repoui)
236 self.repo.ui = self.repo.dirstate._ui = repoui
239 self.repo.ui = self.repo.dirstate._ui = repoui
237 self.repo.invalidateall()
240 self.repo.invalidateall()
238
241
239 for ui in uis:
242 for ui in uis:
240 ui.resetstate()
243 ui.resetstate()
241 # any kind of interaction must use server channels, but chg may
244 # any kind of interaction must use server channels, but chg may
242 # replace channels by fully functional tty files. so nontty is
245 # replace channels by fully functional tty files. so nontty is
243 # enforced only if cin is a channel.
246 # enforced only if cin is a channel.
244 if not util.safehasattr(self.cin, 'fileno'):
247 if not util.safehasattr(self.cin, 'fileno'):
245 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
248 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
246
249
247 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
250 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
248 self.cout, self.cerr)
251 self.cout, self.cerr)
249
252
250 ret = (dispatch.dispatch(req) or 0) & 255 # might return None
253 ret = (dispatch.dispatch(req) or 0) & 255 # might return None
251
254
252 # restore old cwd
255 # restore old cwd
253 if '--cwd' in args:
256 if '--cwd' in args:
254 os.chdir(self.cwd)
257 os.chdir(self.cwd)
255
258
256 self.cresult.write(struct.pack('>i', int(ret)))
259 self.cresult.write(struct.pack('>i', int(ret)))
257
260
258 def getencoding(self):
261 def getencoding(self):
259 """ writes the current encoding to the result channel """
262 """ writes the current encoding to the result channel """
260 self.cresult.write(encoding.encoding)
263 self.cresult.write(encoding.encoding)
261
264
262 def serveone(self):
265 def serveone(self):
263 cmd = self.client.readline()[:-1]
266 cmd = self.client.readline()[:-1]
264 if cmd:
267 if cmd:
265 handler = self.capabilities.get(cmd)
268 handler = self.capabilities.get(cmd)
266 if handler:
269 if handler:
267 handler(self)
270 handler(self)
268 else:
271 else:
269 # clients are expected to check what commands are supported by
272 # clients are expected to check what commands are supported by
270 # looking at the servers capabilities
273 # looking at the servers capabilities
271 raise error.Abort(_('unknown command %s') % cmd)
274 raise error.Abort(_('unknown command %s') % cmd)
272
275
273 return cmd != ''
276 return cmd != ''
274
277
275 capabilities = {'runcommand' : runcommand,
278 capabilities = {'runcommand' : runcommand,
276 'getencoding' : getencoding}
279 'getencoding' : getencoding}
277
280
278 def serve(self):
281 def serve(self):
279 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
282 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
280 hellomsg += '\n'
283 hellomsg += '\n'
281 hellomsg += 'encoding: ' + encoding.encoding
284 hellomsg += 'encoding: ' + encoding.encoding
282 hellomsg += '\n'
285 hellomsg += '\n'
283 hellomsg += 'pid: %d' % util.getpid()
286 hellomsg += 'pid: %d' % util.getpid()
284
287
285 # write the hello msg in -one- chunk
288 # write the hello msg in -one- chunk
286 self.cout.write(hellomsg)
289 self.cout.write(hellomsg)
287
290
288 try:
291 try:
289 while self.serveone():
292 while self.serveone():
290 pass
293 pass
291 except EOFError:
294 except EOFError:
292 # we'll get here if the client disconnected while we were reading
295 # we'll get here if the client disconnected while we were reading
293 # its request
296 # its request
294 return 1
297 return 1
295
298
296 return 0
299 return 0
297
300
298 def _protectio(ui):
301 def _protectio(ui):
299 """ duplicates streams and redirect original to null if ui uses stdio """
302 """ duplicates streams and redirect original to null if ui uses stdio """
300 ui.flush()
303 ui.flush()
301 newfiles = []
304 newfiles = []
302 nullfd = os.open(os.devnull, os.O_RDWR)
305 nullfd = os.open(os.devnull, os.O_RDWR)
303 for f, sysf, mode in [(ui.fin, sys.stdin, 'rb'),
306 for f, sysf, mode in [(ui.fin, sys.stdin, 'rb'),
304 (ui.fout, sys.stdout, 'wb')]:
307 (ui.fout, sys.stdout, 'wb')]:
305 if f is sysf:
308 if f is sysf:
306 newfd = os.dup(f.fileno())
309 newfd = os.dup(f.fileno())
307 os.dup2(nullfd, f.fileno())
310 os.dup2(nullfd, f.fileno())
308 f = os.fdopen(newfd, mode)
311 f = os.fdopen(newfd, mode)
309 newfiles.append(f)
312 newfiles.append(f)
310 os.close(nullfd)
313 os.close(nullfd)
311 return tuple(newfiles)
314 return tuple(newfiles)
312
315
313 def _restoreio(ui, fin, fout):
316 def _restoreio(ui, fin, fout):
314 """ restores streams from duplicated ones """
317 """ restores streams from duplicated ones """
315 ui.flush()
318 ui.flush()
316 for f, uif in [(fin, ui.fin), (fout, ui.fout)]:
319 for f, uif in [(fin, ui.fin), (fout, ui.fout)]:
317 if f is not uif:
320 if f is not uif:
318 os.dup2(f.fileno(), uif.fileno())
321 os.dup2(f.fileno(), uif.fileno())
319 f.close()
322 f.close()
320
323
321 class pipeservice(object):
324 class pipeservice(object):
322 def __init__(self, ui, repo, opts):
325 def __init__(self, ui, repo, opts):
323 self.ui = ui
326 self.ui = ui
324 self.repo = repo
327 self.repo = repo
325
328
326 def init(self):
329 def init(self):
327 pass
330 pass
328
331
329 def run(self):
332 def run(self):
330 ui = self.ui
333 ui = self.ui
331 # redirect stdio to null device so that broken extensions or in-process
334 # redirect stdio to null device so that broken extensions or in-process
332 # hooks will never cause corruption of channel protocol.
335 # hooks will never cause corruption of channel protocol.
333 fin, fout = _protectio(ui)
336 fin, fout = _protectio(ui)
334 try:
337 try:
335 sv = server(ui, self.repo, fin, fout)
338 sv = server(ui, self.repo, fin, fout)
336 return sv.serve()
339 return sv.serve()
337 finally:
340 finally:
338 sv.cleanup()
341 sv.cleanup()
339 _restoreio(ui, fin, fout)
342 _restoreio(ui, fin, fout)
340
343
341 def _serverequest(ui, repo, conn, createcmdserver):
344 def _serverequest(ui, repo, conn, createcmdserver):
342 if True: # TODO: unindent
345 if True: # TODO: unindent
343 # use a different process group from the master process, making this
346 # use a different process group from the master process, making this
344 # process pass kernel "is_current_pgrp_orphaned" check so signals like
347 # process pass kernel "is_current_pgrp_orphaned" check so signals like
345 # SIGTSTP, SIGTTIN, SIGTTOU are not ignored.
348 # SIGTSTP, SIGTTIN, SIGTTOU are not ignored.
346 os.setpgid(0, 0)
349 os.setpgid(0, 0)
347 # change random state otherwise forked request handlers would have a
350 # change random state otherwise forked request handlers would have a
348 # same state inherited from parent.
351 # same state inherited from parent.
349 random.seed()
352 random.seed()
350
353
351 fin = conn.makefile('rb')
354 fin = conn.makefile('rb')
352 fout = conn.makefile('wb')
355 fout = conn.makefile('wb')
353 sv = None
356 sv = None
354 try:
357 try:
355 sv = createcmdserver(repo, conn, fin, fout)
358 sv = createcmdserver(repo, conn, fin, fout)
356 try:
359 try:
357 sv.serve()
360 sv.serve()
358 # handle exceptions that may be raised by command server. most of
361 # handle exceptions that may be raised by command server. most of
359 # known exceptions are caught by dispatch.
362 # known exceptions are caught by dispatch.
360 except error.Abort as inst:
363 except error.Abort as inst:
361 ui.warn(_('abort: %s\n') % inst)
364 ui.warn(_('abort: %s\n') % inst)
362 except IOError as inst:
365 except IOError as inst:
363 if inst.errno != errno.EPIPE:
366 if inst.errno != errno.EPIPE:
364 raise
367 raise
365 except KeyboardInterrupt:
368 except KeyboardInterrupt:
366 pass
369 pass
367 finally:
370 finally:
368 sv.cleanup()
371 sv.cleanup()
369 except: # re-raises
372 except: # re-raises
370 # also write traceback to error channel. otherwise client cannot
373 # also write traceback to error channel. otherwise client cannot
371 # see it because it is written to server's stderr by default.
374 # see it because it is written to server's stderr by default.
372 if sv:
375 if sv:
373 cerr = sv.cerr
376 cerr = sv.cerr
374 else:
377 else:
375 cerr = channeledoutput(fout, 'e')
378 cerr = channeledoutput(fout, 'e')
376 traceback.print_exc(file=cerr)
379 traceback.print_exc(file=cerr)
377 raise
380 raise
378 finally:
381 finally:
379 fin.close()
382 fin.close()
380 try:
383 try:
381 fout.close() # implicit flush() may cause another EPIPE
384 fout.close() # implicit flush() may cause another EPIPE
382 except IOError as inst:
385 except IOError as inst:
383 if inst.errno != errno.EPIPE:
386 if inst.errno != errno.EPIPE:
384 raise
387 raise
385 # trigger __del__ since ForkingMixIn uses os._exit
388 # trigger __del__ since ForkingMixIn uses os._exit
386 gc.collect()
389 gc.collect()
387
390
391 class unixservicehandler(object):
392 """Set of pluggable operations for unix-mode services
393
394 Almost all methods except for createcmdserver() are called in the main
395 process. You can't pass mutable resource back from createcmdserver().
396 """
397
398 pollinterval = None
399
400 def __init__(self, ui):
401 self.ui = ui
402
403 def bindsocket(self, sock, address):
404 util.bindunixsocket(sock, address)
405
406 def unlinksocket(self, address):
407 os.unlink(address)
408
409 def printbanner(self, address):
410 self.ui.status(_('listening at %s\n') % address)
411 self.ui.flush() # avoid buffering of status message
412
413 def shouldexit(self):
414 """True if server should shut down; checked per pollinterval"""
415 return False
416
417 def newconnection(self):
418 """Called when main process notices new connection"""
419 pass
420
421 def createcmdserver(self, repo, conn, fin, fout):
422 """Create new command server instance; called in the process that
423 serves for the current connection"""
424 return server(self.ui, repo, fin, fout)
425
388 class _requesthandler(socketserver.BaseRequestHandler):
426 class _requesthandler(socketserver.BaseRequestHandler):
389 def handle(self):
427 def handle(self):
390 _serverequest(self.server.ui, self.server.repo, self.request,
428 _serverequest(self.server.ui, self.server.repo, self.request,
391 self._createcmdserver)
429 self._createcmdserver)
392
430
393 def _createcmdserver(self, repo, conn, fin, fout):
431 def _createcmdserver(self, repo, conn, fin, fout):
394 ui = self.server.ui
432 ui = self.server.ui
395 return server(ui, repo, fin, fout)
433 return server(ui, repo, fin, fout)
396
434
397 class unixservice(object):
435 class unixservice(object):
398 """
436 """
399 Listens on unix domain socket and forks server per connection
437 Listens on unix domain socket and forks server per connection
400 """
438 """
401 def __init__(self, ui, repo, opts):
439 def __init__(self, ui, repo, opts):
402 self.ui = ui
440 self.ui = ui
403 self.repo = repo
441 self.repo = repo
404 self.address = opts['address']
442 self.address = opts['address']
405 if not util.safehasattr(socketserver, 'UnixStreamServer'):
443 if not util.safehasattr(socketserver, 'UnixStreamServer'):
406 raise error.Abort(_('unsupported platform'))
444 raise error.Abort(_('unsupported platform'))
407 if not self.address:
445 if not self.address:
408 raise error.Abort(_('no socket path specified with --address'))
446 raise error.Abort(_('no socket path specified with --address'))
409
447
410 def init(self):
448 def init(self):
411 class cls(socketserver.ForkingMixIn, socketserver.UnixStreamServer):
449 class cls(socketserver.ForkingMixIn, socketserver.UnixStreamServer):
412 ui = self.ui
450 ui = self.ui
413 repo = self.repo
451 repo = self.repo
414 self.server = cls(self.address, _requesthandler)
452 self.server = cls(self.address, _requesthandler)
415 self.ui.status(_('listening at %s\n') % self.address)
453 self.ui.status(_('listening at %s\n') % self.address)
416 self.ui.flush() # avoid buffering of status message
454 self.ui.flush() # avoid buffering of status message
417
455
418 def _cleanup(self):
456 def _cleanup(self):
419 os.unlink(self.address)
457 os.unlink(self.address)
420
458
421 def run(self):
459 def run(self):
422 try:
460 try:
423 self.server.serve_forever()
461 self.server.serve_forever()
424 finally:
462 finally:
425 self._cleanup()
463 self._cleanup()
426
464
465 class unixforkingservice(unixservice):
466 def __init__(self, ui, repo, opts, handler=None):
467 super(unixforkingservice, self).__init__(ui, repo, opts)
468 self._servicehandler = handler or unixservicehandler(ui)
469 self._sock = None
470 self._oldsigchldhandler = None
471 self._workerpids = set() # updated by signal handler; do not iterate
472
473 def init(self):
474 self._sock = socket.socket(socket.AF_UNIX)
475 self._servicehandler.bindsocket(self._sock, self.address)
476 self._sock.listen(5)
477 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
478 self._oldsigchldhandler = o
479 self._servicehandler.printbanner(self.address)
480
481 def _cleanup(self):
482 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
483 self._sock.close()
484 self._servicehandler.unlinksocket(self.address)
485 # don't kill child processes as they have active clients, just wait
486 self._reapworkers(0)
487
488 def run(self):
489 try:
490 self._mainloop()
491 finally:
492 self._cleanup()
493
494 def _mainloop(self):
495 h = self._servicehandler
496 while not h.shouldexit():
497 try:
498 ready = select.select([self._sock], [], [], h.pollinterval)[0]
499 if not ready:
500 continue
501 conn, _addr = self._sock.accept()
502 except (select.error, socket.error) as inst:
503 if inst.args[0] == errno.EINTR:
504 continue
505 raise
506
507 pid = os.fork()
508 if pid:
509 try:
510 self.ui.debug('forked worker process (pid=%d)\n' % pid)
511 self._workerpids.add(pid)
512 h.newconnection()
513 finally:
514 conn.close() # release handle in parent process
515 else:
516 try:
517 self._serveworker(conn)
518 conn.close()
519 os._exit(0)
520 except: # never return, hence no re-raises
521 try:
522 self.ui.traceback(force=True)
523 finally:
524 os._exit(255)
525
526 def _sigchldhandler(self, signal, frame):
527 self._reapworkers(os.WNOHANG)
528
529 def _reapworkers(self, options):
530 while self._workerpids:
531 try:
532 pid, _status = os.waitpid(-1, options)
533 except OSError as inst:
534 if inst.errno == errno.EINTR:
535 continue
536 if inst.errno != errno.ECHILD:
537 raise
538 # no child processes at all (reaped by other waitpid()?)
539 self._workerpids.clear()
540 return
541 if pid == 0:
542 # no waitable child processes
543 return
544 self.ui.debug('worker process exited (pid=%d)\n' % pid)
545 self._workerpids.discard(pid)
546
547 def _serveworker(self, conn):
548 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
549 h = self._servicehandler
550 _serverequest(self.ui, self.repo, conn, h.createcmdserver)
551
427 _servicemap = {
552 _servicemap = {
428 'pipe': pipeservice,
553 'pipe': pipeservice,
429 'unix': unixservice,
554 'unix': unixforkingservice,
430 }
555 }
431
556
432 def createservice(ui, repo, opts):
557 def createservice(ui, repo, opts):
433 mode = opts['cmdserver']
558 mode = opts['cmdserver']
434 try:
559 try:
435 return _servicemap[mode](ui, repo, opts)
560 return _servicemap[mode](ui, repo, opts)
436 except KeyError:
561 except KeyError:
437 raise error.Abort(_('unknown mode %s') % mode)
562 raise error.Abort(_('unknown mode %s') % mode)
General Comments 0
You need to be logged in to leave comments. Login now