##// END OF EJS Templates
commandserver: prefer first-party selectors module from Python 3 to backport...
Augie Fackler -
r36958:b0ffcb54 default
parent child Browse files
Show More
@@ -1,551 +1,556 b''
1 # commandserver.py - communicate with Mercurial's API over a pipe
1 # commandserver.py - communicate with Mercurial's API over a pipe
2 #
2 #
3 # Copyright Matt Mackall <mpm@selenic.com>
3 # Copyright Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import gc
11 import gc
12 import os
12 import os
13 import random
13 import random
14 import signal
14 import signal
15 import socket
15 import socket
16 import struct
16 import struct
17 import traceback
17 import traceback
18
18
19 try:
20 import selectors
21 selectors.BaseSelector
22 except ImportError:
23 from .thirdparty import selectors2 as selectors
24
19 from .i18n import _
25 from .i18n import _
20 from .thirdparty import selectors2
21 from . import (
26 from . import (
22 encoding,
27 encoding,
23 error,
28 error,
24 pycompat,
29 pycompat,
25 util,
30 util,
26 )
31 )
27
32
28 logfile = None
33 logfile = None
29
34
30 def log(*args):
35 def log(*args):
31 if not logfile:
36 if not logfile:
32 return
37 return
33
38
34 for a in args:
39 for a in args:
35 logfile.write(str(a))
40 logfile.write(str(a))
36
41
37 logfile.flush()
42 logfile.flush()
38
43
39 class channeledoutput(object):
44 class channeledoutput(object):
40 """
45 """
41 Write data to out in the following format:
46 Write data to out in the following format:
42
47
43 data length (unsigned int),
48 data length (unsigned int),
44 data
49 data
45 """
50 """
46 def __init__(self, out, channel):
51 def __init__(self, out, channel):
47 self.out = out
52 self.out = out
48 self.channel = channel
53 self.channel = channel
49
54
50 @property
55 @property
51 def name(self):
56 def name(self):
52 return '<%c-channel>' % self.channel
57 return '<%c-channel>' % self.channel
53
58
54 def write(self, data):
59 def write(self, data):
55 if not data:
60 if not data:
56 return
61 return
57 # single write() to guarantee the same atomicity as the underlying file
62 # single write() to guarantee the same atomicity as the underlying file
58 self.out.write(struct.pack('>cI', self.channel, len(data)) + data)
63 self.out.write(struct.pack('>cI', self.channel, len(data)) + data)
59 self.out.flush()
64 self.out.flush()
60
65
61 def __getattr__(self, attr):
66 def __getattr__(self, attr):
62 if attr in ('isatty', 'fileno', 'tell', 'seek'):
67 if attr in ('isatty', 'fileno', 'tell', 'seek'):
63 raise AttributeError(attr)
68 raise AttributeError(attr)
64 return getattr(self.out, attr)
69 return getattr(self.out, attr)
65
70
66 class channeledinput(object):
71 class channeledinput(object):
67 """
72 """
68 Read data from in_.
73 Read data from in_.
69
74
70 Requests for input are written to out in the following format:
75 Requests for input are written to out in the following format:
71 channel identifier - 'I' for plain input, 'L' line based (1 byte)
76 channel identifier - 'I' for plain input, 'L' line based (1 byte)
72 how many bytes to send at most (unsigned int),
77 how many bytes to send at most (unsigned int),
73
78
74 The client replies with:
79 The client replies with:
75 data length (unsigned int), 0 meaning EOF
80 data length (unsigned int), 0 meaning EOF
76 data
81 data
77 """
82 """
78
83
79 maxchunksize = 4 * 1024
84 maxchunksize = 4 * 1024
80
85
81 def __init__(self, in_, out, channel):
86 def __init__(self, in_, out, channel):
82 self.in_ = in_
87 self.in_ = in_
83 self.out = out
88 self.out = out
84 self.channel = channel
89 self.channel = channel
85
90
86 @property
91 @property
87 def name(self):
92 def name(self):
88 return '<%c-channel>' % self.channel
93 return '<%c-channel>' % self.channel
89
94
90 def read(self, size=-1):
95 def read(self, size=-1):
91 if size < 0:
96 if size < 0:
92 # if we need to consume all the clients input, ask for 4k chunks
97 # if we need to consume all the clients input, ask for 4k chunks
93 # so the pipe doesn't fill up risking a deadlock
98 # so the pipe doesn't fill up risking a deadlock
94 size = self.maxchunksize
99 size = self.maxchunksize
95 s = self._read(size, self.channel)
100 s = self._read(size, self.channel)
96 buf = s
101 buf = s
97 while s:
102 while s:
98 s = self._read(size, self.channel)
103 s = self._read(size, self.channel)
99 buf += s
104 buf += s
100
105
101 return buf
106 return buf
102 else:
107 else:
103 return self._read(size, self.channel)
108 return self._read(size, self.channel)
104
109
105 def _read(self, size, channel):
110 def _read(self, size, channel):
106 if not size:
111 if not size:
107 return ''
112 return ''
108 assert size > 0
113 assert size > 0
109
114
110 # tell the client we need at most size bytes
115 # tell the client we need at most size bytes
111 self.out.write(struct.pack('>cI', channel, size))
116 self.out.write(struct.pack('>cI', channel, size))
112 self.out.flush()
117 self.out.flush()
113
118
114 length = self.in_.read(4)
119 length = self.in_.read(4)
115 length = struct.unpack('>I', length)[0]
120 length = struct.unpack('>I', length)[0]
116 if not length:
121 if not length:
117 return ''
122 return ''
118 else:
123 else:
119 return self.in_.read(length)
124 return self.in_.read(length)
120
125
121 def readline(self, size=-1):
126 def readline(self, size=-1):
122 if size < 0:
127 if size < 0:
123 size = self.maxchunksize
128 size = self.maxchunksize
124 s = self._read(size, 'L')
129 s = self._read(size, 'L')
125 buf = s
130 buf = s
126 # keep asking for more until there's either no more or
131 # keep asking for more until there's either no more or
127 # we got a full line
132 # we got a full line
128 while s and s[-1] != '\n':
133 while s and s[-1] != '\n':
129 s = self._read(size, 'L')
134 s = self._read(size, 'L')
130 buf += s
135 buf += s
131
136
132 return buf
137 return buf
133 else:
138 else:
134 return self._read(size, 'L')
139 return self._read(size, 'L')
135
140
136 def __iter__(self):
141 def __iter__(self):
137 return self
142 return self
138
143
139 def next(self):
144 def next(self):
140 l = self.readline()
145 l = self.readline()
141 if not l:
146 if not l:
142 raise StopIteration
147 raise StopIteration
143 return l
148 return l
144
149
145 def __getattr__(self, attr):
150 def __getattr__(self, attr):
146 if attr in ('isatty', 'fileno', 'tell', 'seek'):
151 if attr in ('isatty', 'fileno', 'tell', 'seek'):
147 raise AttributeError(attr)
152 raise AttributeError(attr)
148 return getattr(self.in_, attr)
153 return getattr(self.in_, attr)
149
154
150 class server(object):
155 class server(object):
151 """
156 """
152 Listens for commands on fin, runs them and writes the output on a channel
157 Listens for commands on fin, runs them and writes the output on a channel
153 based stream to fout.
158 based stream to fout.
154 """
159 """
155 def __init__(self, ui, repo, fin, fout):
160 def __init__(self, ui, repo, fin, fout):
156 self.cwd = pycompat.getcwd()
161 self.cwd = pycompat.getcwd()
157
162
158 # developer config: cmdserver.log
163 # developer config: cmdserver.log
159 logpath = ui.config("cmdserver", "log")
164 logpath = ui.config("cmdserver", "log")
160 if logpath:
165 if logpath:
161 global logfile
166 global logfile
162 if logpath == '-':
167 if logpath == '-':
163 # write log on a special 'd' (debug) channel
168 # write log on a special 'd' (debug) channel
164 logfile = channeledoutput(fout, 'd')
169 logfile = channeledoutput(fout, 'd')
165 else:
170 else:
166 logfile = open(logpath, 'a')
171 logfile = open(logpath, 'a')
167
172
168 if repo:
173 if repo:
169 # the ui here is really the repo ui so take its baseui so we don't
174 # the ui here is really the repo ui so take its baseui so we don't
170 # end up with its local configuration
175 # end up with its local configuration
171 self.ui = repo.baseui
176 self.ui = repo.baseui
172 self.repo = repo
177 self.repo = repo
173 self.repoui = repo.ui
178 self.repoui = repo.ui
174 else:
179 else:
175 self.ui = ui
180 self.ui = ui
176 self.repo = self.repoui = None
181 self.repo = self.repoui = None
177
182
178 self.cerr = channeledoutput(fout, 'e')
183 self.cerr = channeledoutput(fout, 'e')
179 self.cout = channeledoutput(fout, 'o')
184 self.cout = channeledoutput(fout, 'o')
180 self.cin = channeledinput(fin, fout, 'I')
185 self.cin = channeledinput(fin, fout, 'I')
181 self.cresult = channeledoutput(fout, 'r')
186 self.cresult = channeledoutput(fout, 'r')
182
187
183 self.client = fin
188 self.client = fin
184
189
185 def cleanup(self):
190 def cleanup(self):
186 """release and restore resources taken during server session"""
191 """release and restore resources taken during server session"""
187
192
188 def _read(self, size):
193 def _read(self, size):
189 if not size:
194 if not size:
190 return ''
195 return ''
191
196
192 data = self.client.read(size)
197 data = self.client.read(size)
193
198
194 # is the other end closed?
199 # is the other end closed?
195 if not data:
200 if not data:
196 raise EOFError
201 raise EOFError
197
202
198 return data
203 return data
199
204
200 def _readstr(self):
205 def _readstr(self):
201 """read a string from the channel
206 """read a string from the channel
202
207
203 format:
208 format:
204 data length (uint32), data
209 data length (uint32), data
205 """
210 """
206 length = struct.unpack('>I', self._read(4))[0]
211 length = struct.unpack('>I', self._read(4))[0]
207 if not length:
212 if not length:
208 return ''
213 return ''
209 return self._read(length)
214 return self._read(length)
210
215
211 def _readlist(self):
216 def _readlist(self):
212 """read a list of NULL separated strings from the channel"""
217 """read a list of NULL separated strings from the channel"""
213 s = self._readstr()
218 s = self._readstr()
214 if s:
219 if s:
215 return s.split('\0')
220 return s.split('\0')
216 else:
221 else:
217 return []
222 return []
218
223
219 def runcommand(self):
224 def runcommand(self):
220 """ reads a list of \0 terminated arguments, executes
225 """ reads a list of \0 terminated arguments, executes
221 and writes the return code to the result channel """
226 and writes the return code to the result channel """
222 from . import dispatch # avoid cycle
227 from . import dispatch # avoid cycle
223
228
224 args = self._readlist()
229 args = self._readlist()
225
230
226 # copy the uis so changes (e.g. --config or --verbose) don't
231 # copy the uis so changes (e.g. --config or --verbose) don't
227 # persist between requests
232 # persist between requests
228 copiedui = self.ui.copy()
233 copiedui = self.ui.copy()
229 uis = [copiedui]
234 uis = [copiedui]
230 if self.repo:
235 if self.repo:
231 self.repo.baseui = copiedui
236 self.repo.baseui = copiedui
232 # clone ui without using ui.copy because this is protected
237 # clone ui without using ui.copy because this is protected
233 repoui = self.repoui.__class__(self.repoui)
238 repoui = self.repoui.__class__(self.repoui)
234 repoui.copy = copiedui.copy # redo copy protection
239 repoui.copy = copiedui.copy # redo copy protection
235 uis.append(repoui)
240 uis.append(repoui)
236 self.repo.ui = self.repo.dirstate._ui = repoui
241 self.repo.ui = self.repo.dirstate._ui = repoui
237 self.repo.invalidateall()
242 self.repo.invalidateall()
238
243
239 for ui in uis:
244 for ui in uis:
240 ui.resetstate()
245 ui.resetstate()
241 # any kind of interaction must use server channels, but chg may
246 # any kind of interaction must use server channels, but chg may
242 # replace channels by fully functional tty files. so nontty is
247 # replace channels by fully functional tty files. so nontty is
243 # enforced only if cin is a channel.
248 # enforced only if cin is a channel.
244 if not util.safehasattr(self.cin, 'fileno'):
249 if not util.safehasattr(self.cin, 'fileno'):
245 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
250 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
246
251
247 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
252 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
248 self.cout, self.cerr)
253 self.cout, self.cerr)
249
254
250 try:
255 try:
251 ret = (dispatch.dispatch(req) or 0) & 255 # might return None
256 ret = (dispatch.dispatch(req) or 0) & 255 # might return None
252 self.cresult.write(struct.pack('>i', int(ret)))
257 self.cresult.write(struct.pack('>i', int(ret)))
253 finally:
258 finally:
254 # restore old cwd
259 # restore old cwd
255 if '--cwd' in args:
260 if '--cwd' in args:
256 os.chdir(self.cwd)
261 os.chdir(self.cwd)
257
262
258 def getencoding(self):
263 def getencoding(self):
259 """ writes the current encoding to the result channel """
264 """ writes the current encoding to the result channel """
260 self.cresult.write(encoding.encoding)
265 self.cresult.write(encoding.encoding)
261
266
262 def serveone(self):
267 def serveone(self):
263 cmd = self.client.readline()[:-1]
268 cmd = self.client.readline()[:-1]
264 if cmd:
269 if cmd:
265 handler = self.capabilities.get(cmd)
270 handler = self.capabilities.get(cmd)
266 if handler:
271 if handler:
267 handler(self)
272 handler(self)
268 else:
273 else:
269 # clients are expected to check what commands are supported by
274 # clients are expected to check what commands are supported by
270 # looking at the servers capabilities
275 # looking at the servers capabilities
271 raise error.Abort(_('unknown command %s') % cmd)
276 raise error.Abort(_('unknown command %s') % cmd)
272
277
273 return cmd != ''
278 return cmd != ''
274
279
275 capabilities = {'runcommand': runcommand,
280 capabilities = {'runcommand': runcommand,
276 'getencoding': getencoding}
281 'getencoding': getencoding}
277
282
278 def serve(self):
283 def serve(self):
279 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
284 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
280 hellomsg += '\n'
285 hellomsg += '\n'
281 hellomsg += 'encoding: ' + encoding.encoding
286 hellomsg += 'encoding: ' + encoding.encoding
282 hellomsg += '\n'
287 hellomsg += '\n'
283 hellomsg += 'pid: %d' % util.getpid()
288 hellomsg += 'pid: %d' % util.getpid()
284 if util.safehasattr(os, 'getpgid'):
289 if util.safehasattr(os, 'getpgid'):
285 hellomsg += '\n'
290 hellomsg += '\n'
286 hellomsg += 'pgid: %d' % os.getpgid(0)
291 hellomsg += 'pgid: %d' % os.getpgid(0)
287
292
288 # write the hello msg in -one- chunk
293 # write the hello msg in -one- chunk
289 self.cout.write(hellomsg)
294 self.cout.write(hellomsg)
290
295
291 try:
296 try:
292 while self.serveone():
297 while self.serveone():
293 pass
298 pass
294 except EOFError:
299 except EOFError:
295 # we'll get here if the client disconnected while we were reading
300 # we'll get here if the client disconnected while we were reading
296 # its request
301 # its request
297 return 1
302 return 1
298
303
299 return 0
304 return 0
300
305
301 def _protectio(ui):
306 def _protectio(ui):
302 """ duplicates streams and redirect original to null if ui uses stdio """
307 """ duplicates streams and redirect original to null if ui uses stdio """
303 ui.flush()
308 ui.flush()
304 newfiles = []
309 newfiles = []
305 nullfd = os.open(os.devnull, os.O_RDWR)
310 nullfd = os.open(os.devnull, os.O_RDWR)
306 for f, sysf, mode in [(ui.fin, util.stdin, r'rb'),
311 for f, sysf, mode in [(ui.fin, util.stdin, r'rb'),
307 (ui.fout, util.stdout, r'wb')]:
312 (ui.fout, util.stdout, r'wb')]:
308 if f is sysf:
313 if f is sysf:
309 newfd = os.dup(f.fileno())
314 newfd = os.dup(f.fileno())
310 os.dup2(nullfd, f.fileno())
315 os.dup2(nullfd, f.fileno())
311 f = os.fdopen(newfd, mode)
316 f = os.fdopen(newfd, mode)
312 newfiles.append(f)
317 newfiles.append(f)
313 os.close(nullfd)
318 os.close(nullfd)
314 return tuple(newfiles)
319 return tuple(newfiles)
315
320
316 def _restoreio(ui, fin, fout):
321 def _restoreio(ui, fin, fout):
317 """ restores streams from duplicated ones """
322 """ restores streams from duplicated ones """
318 ui.flush()
323 ui.flush()
319 for f, uif in [(fin, ui.fin), (fout, ui.fout)]:
324 for f, uif in [(fin, ui.fin), (fout, ui.fout)]:
320 if f is not uif:
325 if f is not uif:
321 os.dup2(f.fileno(), uif.fileno())
326 os.dup2(f.fileno(), uif.fileno())
322 f.close()
327 f.close()
323
328
324 class pipeservice(object):
329 class pipeservice(object):
325 def __init__(self, ui, repo, opts):
330 def __init__(self, ui, repo, opts):
326 self.ui = ui
331 self.ui = ui
327 self.repo = repo
332 self.repo = repo
328
333
329 def init(self):
334 def init(self):
330 pass
335 pass
331
336
332 def run(self):
337 def run(self):
333 ui = self.ui
338 ui = self.ui
334 # redirect stdio to null device so that broken extensions or in-process
339 # redirect stdio to null device so that broken extensions or in-process
335 # hooks will never cause corruption of channel protocol.
340 # hooks will never cause corruption of channel protocol.
336 fin, fout = _protectio(ui)
341 fin, fout = _protectio(ui)
337 try:
342 try:
338 sv = server(ui, self.repo, fin, fout)
343 sv = server(ui, self.repo, fin, fout)
339 return sv.serve()
344 return sv.serve()
340 finally:
345 finally:
341 sv.cleanup()
346 sv.cleanup()
342 _restoreio(ui, fin, fout)
347 _restoreio(ui, fin, fout)
343
348
344 def _initworkerprocess():
349 def _initworkerprocess():
345 # use a different process group from the master process, in order to:
350 # use a different process group from the master process, in order to:
346 # 1. make the current process group no longer "orphaned" (because the
351 # 1. make the current process group no longer "orphaned" (because the
347 # parent of this process is in a different process group while
352 # parent of this process is in a different process group while
348 # remains in a same session)
353 # remains in a same session)
349 # according to POSIX 2.2.2.52, orphaned process group will ignore
354 # according to POSIX 2.2.2.52, orphaned process group will ignore
350 # terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
355 # terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
351 # cause trouble for things like ncurses.
356 # cause trouble for things like ncurses.
352 # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
357 # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
353 # SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
358 # SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
354 # processes like ssh will be killed properly, without affecting
359 # processes like ssh will be killed properly, without affecting
355 # unrelated processes.
360 # unrelated processes.
356 os.setpgid(0, 0)
361 os.setpgid(0, 0)
357 # change random state otherwise forked request handlers would have a
362 # change random state otherwise forked request handlers would have a
358 # same state inherited from parent.
363 # same state inherited from parent.
359 random.seed()
364 random.seed()
360
365
361 def _serverequest(ui, repo, conn, createcmdserver):
366 def _serverequest(ui, repo, conn, createcmdserver):
362 fin = conn.makefile('rb')
367 fin = conn.makefile('rb')
363 fout = conn.makefile('wb')
368 fout = conn.makefile('wb')
364 sv = None
369 sv = None
365 try:
370 try:
366 sv = createcmdserver(repo, conn, fin, fout)
371 sv = createcmdserver(repo, conn, fin, fout)
367 try:
372 try:
368 sv.serve()
373 sv.serve()
369 # handle exceptions that may be raised by command server. most of
374 # handle exceptions that may be raised by command server. most of
370 # known exceptions are caught by dispatch.
375 # known exceptions are caught by dispatch.
371 except error.Abort as inst:
376 except error.Abort as inst:
372 ui.warn(_('abort: %s\n') % inst)
377 ui.warn(_('abort: %s\n') % inst)
373 except IOError as inst:
378 except IOError as inst:
374 if inst.errno != errno.EPIPE:
379 if inst.errno != errno.EPIPE:
375 raise
380 raise
376 except KeyboardInterrupt:
381 except KeyboardInterrupt:
377 pass
382 pass
378 finally:
383 finally:
379 sv.cleanup()
384 sv.cleanup()
380 except: # re-raises
385 except: # re-raises
381 # also write traceback to error channel. otherwise client cannot
386 # also write traceback to error channel. otherwise client cannot
382 # see it because it is written to server's stderr by default.
387 # see it because it is written to server's stderr by default.
383 if sv:
388 if sv:
384 cerr = sv.cerr
389 cerr = sv.cerr
385 else:
390 else:
386 cerr = channeledoutput(fout, 'e')
391 cerr = channeledoutput(fout, 'e')
387 traceback.print_exc(file=cerr)
392 traceback.print_exc(file=cerr)
388 raise
393 raise
389 finally:
394 finally:
390 fin.close()
395 fin.close()
391 try:
396 try:
392 fout.close() # implicit flush() may cause another EPIPE
397 fout.close() # implicit flush() may cause another EPIPE
393 except IOError as inst:
398 except IOError as inst:
394 if inst.errno != errno.EPIPE:
399 if inst.errno != errno.EPIPE:
395 raise
400 raise
396
401
397 class unixservicehandler(object):
402 class unixservicehandler(object):
398 """Set of pluggable operations for unix-mode services
403 """Set of pluggable operations for unix-mode services
399
404
400 Almost all methods except for createcmdserver() are called in the main
405 Almost all methods except for createcmdserver() are called in the main
401 process. You can't pass mutable resource back from createcmdserver().
406 process. You can't pass mutable resource back from createcmdserver().
402 """
407 """
403
408
404 pollinterval = None
409 pollinterval = None
405
410
406 def __init__(self, ui):
411 def __init__(self, ui):
407 self.ui = ui
412 self.ui = ui
408
413
409 def bindsocket(self, sock, address):
414 def bindsocket(self, sock, address):
410 util.bindunixsocket(sock, address)
415 util.bindunixsocket(sock, address)
411 sock.listen(socket.SOMAXCONN)
416 sock.listen(socket.SOMAXCONN)
412 self.ui.status(_('listening at %s\n') % address)
417 self.ui.status(_('listening at %s\n') % address)
413 self.ui.flush() # avoid buffering of status message
418 self.ui.flush() # avoid buffering of status message
414
419
415 def unlinksocket(self, address):
420 def unlinksocket(self, address):
416 os.unlink(address)
421 os.unlink(address)
417
422
418 def shouldexit(self):
423 def shouldexit(self):
419 """True if server should shut down; checked per pollinterval"""
424 """True if server should shut down; checked per pollinterval"""
420 return False
425 return False
421
426
422 def newconnection(self):
427 def newconnection(self):
423 """Called when main process notices new connection"""
428 """Called when main process notices new connection"""
424
429
425 def createcmdserver(self, repo, conn, fin, fout):
430 def createcmdserver(self, repo, conn, fin, fout):
426 """Create new command server instance; called in the process that
431 """Create new command server instance; called in the process that
427 serves for the current connection"""
432 serves for the current connection"""
428 return server(self.ui, repo, fin, fout)
433 return server(self.ui, repo, fin, fout)
429
434
430 class unixforkingservice(object):
435 class unixforkingservice(object):
431 """
436 """
432 Listens on unix domain socket and forks server per connection
437 Listens on unix domain socket and forks server per connection
433 """
438 """
434
439
435 def __init__(self, ui, repo, opts, handler=None):
440 def __init__(self, ui, repo, opts, handler=None):
436 self.ui = ui
441 self.ui = ui
437 self.repo = repo
442 self.repo = repo
438 self.address = opts['address']
443 self.address = opts['address']
439 if not util.safehasattr(socket, 'AF_UNIX'):
444 if not util.safehasattr(socket, 'AF_UNIX'):
440 raise error.Abort(_('unsupported platform'))
445 raise error.Abort(_('unsupported platform'))
441 if not self.address:
446 if not self.address:
442 raise error.Abort(_('no socket path specified with --address'))
447 raise error.Abort(_('no socket path specified with --address'))
443 self._servicehandler = handler or unixservicehandler(ui)
448 self._servicehandler = handler or unixservicehandler(ui)
444 self._sock = None
449 self._sock = None
445 self._oldsigchldhandler = None
450 self._oldsigchldhandler = None
446 self._workerpids = set() # updated by signal handler; do not iterate
451 self._workerpids = set() # updated by signal handler; do not iterate
447 self._socketunlinked = None
452 self._socketunlinked = None
448
453
449 def init(self):
454 def init(self):
450 self._sock = socket.socket(socket.AF_UNIX)
455 self._sock = socket.socket(socket.AF_UNIX)
451 self._servicehandler.bindsocket(self._sock, self.address)
456 self._servicehandler.bindsocket(self._sock, self.address)
452 if util.safehasattr(util, 'unblocksignal'):
457 if util.safehasattr(util, 'unblocksignal'):
453 util.unblocksignal(signal.SIGCHLD)
458 util.unblocksignal(signal.SIGCHLD)
454 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
459 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
455 self._oldsigchldhandler = o
460 self._oldsigchldhandler = o
456 self._socketunlinked = False
461 self._socketunlinked = False
457
462
458 def _unlinksocket(self):
463 def _unlinksocket(self):
459 if not self._socketunlinked:
464 if not self._socketunlinked:
460 self._servicehandler.unlinksocket(self.address)
465 self._servicehandler.unlinksocket(self.address)
461 self._socketunlinked = True
466 self._socketunlinked = True
462
467
463 def _cleanup(self):
468 def _cleanup(self):
464 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
469 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
465 self._sock.close()
470 self._sock.close()
466 self._unlinksocket()
471 self._unlinksocket()
467 # don't kill child processes as they have active clients, just wait
472 # don't kill child processes as they have active clients, just wait
468 self._reapworkers(0)
473 self._reapworkers(0)
469
474
470 def run(self):
475 def run(self):
471 try:
476 try:
472 self._mainloop()
477 self._mainloop()
473 finally:
478 finally:
474 self._cleanup()
479 self._cleanup()
475
480
476 def _mainloop(self):
481 def _mainloop(self):
477 exiting = False
482 exiting = False
478 h = self._servicehandler
483 h = self._servicehandler
479 selector = selectors2.DefaultSelector()
484 selector = selectors.DefaultSelector()
480 selector.register(self._sock, selectors2.EVENT_READ)
485 selector.register(self._sock, selectors.EVENT_READ)
481 while True:
486 while True:
482 if not exiting and h.shouldexit():
487 if not exiting and h.shouldexit():
483 # clients can no longer connect() to the domain socket, so
488 # clients can no longer connect() to the domain socket, so
484 # we stop queuing new requests.
489 # we stop queuing new requests.
485 # for requests that are queued (connect()-ed, but haven't been
490 # for requests that are queued (connect()-ed, but haven't been
486 # accept()-ed), handle them before exit. otherwise, clients
491 # accept()-ed), handle them before exit. otherwise, clients
487 # waiting for recv() will receive ECONNRESET.
492 # waiting for recv() will receive ECONNRESET.
488 self._unlinksocket()
493 self._unlinksocket()
489 exiting = True
494 exiting = True
490 ready = selector.select(timeout=h.pollinterval)
495 ready = selector.select(timeout=h.pollinterval)
491 if not ready:
496 if not ready:
492 # only exit if we completed all queued requests
497 # only exit if we completed all queued requests
493 if exiting:
498 if exiting:
494 break
499 break
495 continue
500 continue
496 try:
501 try:
497 conn, _addr = self._sock.accept()
502 conn, _addr = self._sock.accept()
498 except socket.error as inst:
503 except socket.error as inst:
499 if inst.args[0] == errno.EINTR:
504 if inst.args[0] == errno.EINTR:
500 continue
505 continue
501 raise
506 raise
502
507
503 pid = os.fork()
508 pid = os.fork()
504 if pid:
509 if pid:
505 try:
510 try:
506 self.ui.debug('forked worker process (pid=%d)\n' % pid)
511 self.ui.debug('forked worker process (pid=%d)\n' % pid)
507 self._workerpids.add(pid)
512 self._workerpids.add(pid)
508 h.newconnection()
513 h.newconnection()
509 finally:
514 finally:
510 conn.close() # release handle in parent process
515 conn.close() # release handle in parent process
511 else:
516 else:
512 try:
517 try:
513 self._runworker(conn)
518 self._runworker(conn)
514 conn.close()
519 conn.close()
515 os._exit(0)
520 os._exit(0)
516 except: # never return, hence no re-raises
521 except: # never return, hence no re-raises
517 try:
522 try:
518 self.ui.traceback(force=True)
523 self.ui.traceback(force=True)
519 finally:
524 finally:
520 os._exit(255)
525 os._exit(255)
521 selector.close()
526 selector.close()
522
527
523 def _sigchldhandler(self, signal, frame):
528 def _sigchldhandler(self, signal, frame):
524 self._reapworkers(os.WNOHANG)
529 self._reapworkers(os.WNOHANG)
525
530
526 def _reapworkers(self, options):
531 def _reapworkers(self, options):
527 while self._workerpids:
532 while self._workerpids:
528 try:
533 try:
529 pid, _status = os.waitpid(-1, options)
534 pid, _status = os.waitpid(-1, options)
530 except OSError as inst:
535 except OSError as inst:
531 if inst.errno == errno.EINTR:
536 if inst.errno == errno.EINTR:
532 continue
537 continue
533 if inst.errno != errno.ECHILD:
538 if inst.errno != errno.ECHILD:
534 raise
539 raise
535 # no child processes at all (reaped by other waitpid()?)
540 # no child processes at all (reaped by other waitpid()?)
536 self._workerpids.clear()
541 self._workerpids.clear()
537 return
542 return
538 if pid == 0:
543 if pid == 0:
539 # no waitable child processes
544 # no waitable child processes
540 return
545 return
541 self.ui.debug('worker process exited (pid=%d)\n' % pid)
546 self.ui.debug('worker process exited (pid=%d)\n' % pid)
542 self._workerpids.discard(pid)
547 self._workerpids.discard(pid)
543
548
544 def _runworker(self, conn):
549 def _runworker(self, conn):
545 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
550 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
546 _initworkerprocess()
551 _initworkerprocess()
547 h = self._servicehandler
552 h = self._servicehandler
548 try:
553 try:
549 _serverequest(self.ui, self.repo, conn, h.createcmdserver)
554 _serverequest(self.ui, self.repo, conn, h.createcmdserver)
550 finally:
555 finally:
551 gc.collect() # trigger __del__ since worker process uses os._exit
556 gc.collect() # trigger __del__ since worker process uses os._exit
General Comments 0
You need to be logged in to leave comments. Login now