##// END OF EJS Templates
commandserver: remove superfluous pass statements
Augie Fackler -
r34372:76b334b9 default
parent child Browse files
Show More
@@ -1,551 +1,549 b''
1 # commandserver.py - communicate with Mercurial's API over a pipe
1 # commandserver.py - communicate with Mercurial's API over a pipe
2 #
2 #
3 # Copyright Matt Mackall <mpm@selenic.com>
3 # Copyright Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import gc
11 import gc
12 import os
12 import os
13 import random
13 import random
14 import signal
14 import signal
15 import socket
15 import socket
16 import struct
16 import struct
17 import traceback
17 import traceback
18
18
19 from .i18n import _
19 from .i18n import _
20 from . import (
20 from . import (
21 encoding,
21 encoding,
22 error,
22 error,
23 pycompat,
23 pycompat,
24 selectors2,
24 selectors2,
25 util,
25 util,
26 )
26 )
27
27
28 logfile = None
28 logfile = None
29
29
30 def log(*args):
30 def log(*args):
31 if not logfile:
31 if not logfile:
32 return
32 return
33
33
34 for a in args:
34 for a in args:
35 logfile.write(str(a))
35 logfile.write(str(a))
36
36
37 logfile.flush()
37 logfile.flush()
38
38
39 class channeledoutput(object):
39 class channeledoutput(object):
40 """
40 """
41 Write data to out in the following format:
41 Write data to out in the following format:
42
42
43 data length (unsigned int),
43 data length (unsigned int),
44 data
44 data
45 """
45 """
46 def __init__(self, out, channel):
46 def __init__(self, out, channel):
47 self.out = out
47 self.out = out
48 self.channel = channel
48 self.channel = channel
49
49
50 @property
50 @property
51 def name(self):
51 def name(self):
52 return '<%c-channel>' % self.channel
52 return '<%c-channel>' % self.channel
53
53
54 def write(self, data):
54 def write(self, data):
55 if not data:
55 if not data:
56 return
56 return
57 # single write() to guarantee the same atomicity as the underlying file
57 # single write() to guarantee the same atomicity as the underlying file
58 self.out.write(struct.pack('>cI', self.channel, len(data)) + data)
58 self.out.write(struct.pack('>cI', self.channel, len(data)) + data)
59 self.out.flush()
59 self.out.flush()
60
60
61 def __getattr__(self, attr):
61 def __getattr__(self, attr):
62 if attr in ('isatty', 'fileno', 'tell', 'seek'):
62 if attr in ('isatty', 'fileno', 'tell', 'seek'):
63 raise AttributeError(attr)
63 raise AttributeError(attr)
64 return getattr(self.out, attr)
64 return getattr(self.out, attr)
65
65
66 class channeledinput(object):
66 class channeledinput(object):
67 """
67 """
68 Read data from in_.
68 Read data from in_.
69
69
70 Requests for input are written to out in the following format:
70 Requests for input are written to out in the following format:
71 channel identifier - 'I' for plain input, 'L' line based (1 byte)
71 channel identifier - 'I' for plain input, 'L' line based (1 byte)
72 how many bytes to send at most (unsigned int),
72 how many bytes to send at most (unsigned int),
73
73
74 The client replies with:
74 The client replies with:
75 data length (unsigned int), 0 meaning EOF
75 data length (unsigned int), 0 meaning EOF
76 data
76 data
77 """
77 """
78
78
79 maxchunksize = 4 * 1024
79 maxchunksize = 4 * 1024
80
80
81 def __init__(self, in_, out, channel):
81 def __init__(self, in_, out, channel):
82 self.in_ = in_
82 self.in_ = in_
83 self.out = out
83 self.out = out
84 self.channel = channel
84 self.channel = channel
85
85
86 @property
86 @property
87 def name(self):
87 def name(self):
88 return '<%c-channel>' % self.channel
88 return '<%c-channel>' % self.channel
89
89
90 def read(self, size=-1):
90 def read(self, size=-1):
91 if size < 0:
91 if size < 0:
92 # if we need to consume all the clients input, ask for 4k chunks
92 # if we need to consume all the clients input, ask for 4k chunks
93 # so the pipe doesn't fill up risking a deadlock
93 # so the pipe doesn't fill up risking a deadlock
94 size = self.maxchunksize
94 size = self.maxchunksize
95 s = self._read(size, self.channel)
95 s = self._read(size, self.channel)
96 buf = s
96 buf = s
97 while s:
97 while s:
98 s = self._read(size, self.channel)
98 s = self._read(size, self.channel)
99 buf += s
99 buf += s
100
100
101 return buf
101 return buf
102 else:
102 else:
103 return self._read(size, self.channel)
103 return self._read(size, self.channel)
104
104
105 def _read(self, size, channel):
105 def _read(self, size, channel):
106 if not size:
106 if not size:
107 return ''
107 return ''
108 assert size > 0
108 assert size > 0
109
109
110 # tell the client we need at most size bytes
110 # tell the client we need at most size bytes
111 self.out.write(struct.pack('>cI', channel, size))
111 self.out.write(struct.pack('>cI', channel, size))
112 self.out.flush()
112 self.out.flush()
113
113
114 length = self.in_.read(4)
114 length = self.in_.read(4)
115 length = struct.unpack('>I', length)[0]
115 length = struct.unpack('>I', length)[0]
116 if not length:
116 if not length:
117 return ''
117 return ''
118 else:
118 else:
119 return self.in_.read(length)
119 return self.in_.read(length)
120
120
121 def readline(self, size=-1):
121 def readline(self, size=-1):
122 if size < 0:
122 if size < 0:
123 size = self.maxchunksize
123 size = self.maxchunksize
124 s = self._read(size, 'L')
124 s = self._read(size, 'L')
125 buf = s
125 buf = s
126 # keep asking for more until there's either no more or
126 # keep asking for more until there's either no more or
127 # we got a full line
127 # we got a full line
128 while s and s[-1] != '\n':
128 while s and s[-1] != '\n':
129 s = self._read(size, 'L')
129 s = self._read(size, 'L')
130 buf += s
130 buf += s
131
131
132 return buf
132 return buf
133 else:
133 else:
134 return self._read(size, 'L')
134 return self._read(size, 'L')
135
135
136 def __iter__(self):
136 def __iter__(self):
137 return self
137 return self
138
138
139 def next(self):
139 def next(self):
140 l = self.readline()
140 l = self.readline()
141 if not l:
141 if not l:
142 raise StopIteration
142 raise StopIteration
143 return l
143 return l
144
144
145 def __getattr__(self, attr):
145 def __getattr__(self, attr):
146 if attr in ('isatty', 'fileno', 'tell', 'seek'):
146 if attr in ('isatty', 'fileno', 'tell', 'seek'):
147 raise AttributeError(attr)
147 raise AttributeError(attr)
148 return getattr(self.in_, attr)
148 return getattr(self.in_, attr)
149
149
150 class server(object):
150 class server(object):
151 """
151 """
152 Listens for commands on fin, runs them and writes the output on a channel
152 Listens for commands on fin, runs them and writes the output on a channel
153 based stream to fout.
153 based stream to fout.
154 """
154 """
155 def __init__(self, ui, repo, fin, fout):
155 def __init__(self, ui, repo, fin, fout):
156 self.cwd = pycompat.getcwd()
156 self.cwd = pycompat.getcwd()
157
157
158 # developer config: cmdserver.log
158 # developer config: cmdserver.log
159 logpath = ui.config("cmdserver", "log")
159 logpath = ui.config("cmdserver", "log")
160 if logpath:
160 if logpath:
161 global logfile
161 global logfile
162 if logpath == '-':
162 if logpath == '-':
163 # write log on a special 'd' (debug) channel
163 # write log on a special 'd' (debug) channel
164 logfile = channeledoutput(fout, 'd')
164 logfile = channeledoutput(fout, 'd')
165 else:
165 else:
166 logfile = open(logpath, 'a')
166 logfile = open(logpath, 'a')
167
167
168 if repo:
168 if repo:
169 # the ui here is really the repo ui so take its baseui so we don't
169 # the ui here is really the repo ui so take its baseui so we don't
170 # end up with its local configuration
170 # end up with its local configuration
171 self.ui = repo.baseui
171 self.ui = repo.baseui
172 self.repo = repo
172 self.repo = repo
173 self.repoui = repo.ui
173 self.repoui = repo.ui
174 else:
174 else:
175 self.ui = ui
175 self.ui = ui
176 self.repo = self.repoui = None
176 self.repo = self.repoui = None
177
177
178 self.cerr = channeledoutput(fout, 'e')
178 self.cerr = channeledoutput(fout, 'e')
179 self.cout = channeledoutput(fout, 'o')
179 self.cout = channeledoutput(fout, 'o')
180 self.cin = channeledinput(fin, fout, 'I')
180 self.cin = channeledinput(fin, fout, 'I')
181 self.cresult = channeledoutput(fout, 'r')
181 self.cresult = channeledoutput(fout, 'r')
182
182
183 self.client = fin
183 self.client = fin
184
184
185 def cleanup(self):
185 def cleanup(self):
186 """release and restore resources taken during server session"""
186 """release and restore resources taken during server session"""
187 pass
188
187
189 def _read(self, size):
188 def _read(self, size):
190 if not size:
189 if not size:
191 return ''
190 return ''
192
191
193 data = self.client.read(size)
192 data = self.client.read(size)
194
193
195 # is the other end closed?
194 # is the other end closed?
196 if not data:
195 if not data:
197 raise EOFError
196 raise EOFError
198
197
199 return data
198 return data
200
199
201 def _readstr(self):
200 def _readstr(self):
202 """read a string from the channel
201 """read a string from the channel
203
202
204 format:
203 format:
205 data length (uint32), data
204 data length (uint32), data
206 """
205 """
207 length = struct.unpack('>I', self._read(4))[0]
206 length = struct.unpack('>I', self._read(4))[0]
208 if not length:
207 if not length:
209 return ''
208 return ''
210 return self._read(length)
209 return self._read(length)
211
210
212 def _readlist(self):
211 def _readlist(self):
213 """read a list of NULL separated strings from the channel"""
212 """read a list of NULL separated strings from the channel"""
214 s = self._readstr()
213 s = self._readstr()
215 if s:
214 if s:
216 return s.split('\0')
215 return s.split('\0')
217 else:
216 else:
218 return []
217 return []
219
218
220 def runcommand(self):
219 def runcommand(self):
221 """ reads a list of \0 terminated arguments, executes
220 """ reads a list of \0 terminated arguments, executes
222 and writes the return code to the result channel """
221 and writes the return code to the result channel """
223 from . import dispatch # avoid cycle
222 from . import dispatch # avoid cycle
224
223
225 args = self._readlist()
224 args = self._readlist()
226
225
227 # copy the uis so changes (e.g. --config or --verbose) don't
226 # copy the uis so changes (e.g. --config or --verbose) don't
228 # persist between requests
227 # persist between requests
229 copiedui = self.ui.copy()
228 copiedui = self.ui.copy()
230 uis = [copiedui]
229 uis = [copiedui]
231 if self.repo:
230 if self.repo:
232 self.repo.baseui = copiedui
231 self.repo.baseui = copiedui
233 # clone ui without using ui.copy because this is protected
232 # clone ui without using ui.copy because this is protected
234 repoui = self.repoui.__class__(self.repoui)
233 repoui = self.repoui.__class__(self.repoui)
235 repoui.copy = copiedui.copy # redo copy protection
234 repoui.copy = copiedui.copy # redo copy protection
236 uis.append(repoui)
235 uis.append(repoui)
237 self.repo.ui = self.repo.dirstate._ui = repoui
236 self.repo.ui = self.repo.dirstate._ui = repoui
238 self.repo.invalidateall()
237 self.repo.invalidateall()
239
238
240 for ui in uis:
239 for ui in uis:
241 ui.resetstate()
240 ui.resetstate()
242 # any kind of interaction must use server channels, but chg may
241 # any kind of interaction must use server channels, but chg may
243 # replace channels by fully functional tty files. so nontty is
242 # replace channels by fully functional tty files. so nontty is
244 # enforced only if cin is a channel.
243 # enforced only if cin is a channel.
245 if not util.safehasattr(self.cin, 'fileno'):
244 if not util.safehasattr(self.cin, 'fileno'):
246 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
245 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
247
246
248 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
247 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
249 self.cout, self.cerr)
248 self.cout, self.cerr)
250
249
251 ret = (dispatch.dispatch(req) or 0) & 255 # might return None
250 ret = (dispatch.dispatch(req) or 0) & 255 # might return None
252
251
253 # restore old cwd
252 # restore old cwd
254 if '--cwd' in args:
253 if '--cwd' in args:
255 os.chdir(self.cwd)
254 os.chdir(self.cwd)
256
255
257 self.cresult.write(struct.pack('>i', int(ret)))
256 self.cresult.write(struct.pack('>i', int(ret)))
258
257
259 def getencoding(self):
258 def getencoding(self):
260 """ writes the current encoding to the result channel """
259 """ writes the current encoding to the result channel """
261 self.cresult.write(encoding.encoding)
260 self.cresult.write(encoding.encoding)
262
261
263 def serveone(self):
262 def serveone(self):
264 cmd = self.client.readline()[:-1]
263 cmd = self.client.readline()[:-1]
265 if cmd:
264 if cmd:
266 handler = self.capabilities.get(cmd)
265 handler = self.capabilities.get(cmd)
267 if handler:
266 if handler:
268 handler(self)
267 handler(self)
269 else:
268 else:
270 # clients are expected to check what commands are supported by
269 # clients are expected to check what commands are supported by
271 # looking at the servers capabilities
270 # looking at the servers capabilities
272 raise error.Abort(_('unknown command %s') % cmd)
271 raise error.Abort(_('unknown command %s') % cmd)
273
272
274 return cmd != ''
273 return cmd != ''
275
274
276 capabilities = {'runcommand' : runcommand,
275 capabilities = {'runcommand' : runcommand,
277 'getencoding' : getencoding}
276 'getencoding' : getencoding}
278
277
279 def serve(self):
278 def serve(self):
280 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
279 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
281 hellomsg += '\n'
280 hellomsg += '\n'
282 hellomsg += 'encoding: ' + encoding.encoding
281 hellomsg += 'encoding: ' + encoding.encoding
283 hellomsg += '\n'
282 hellomsg += '\n'
284 hellomsg += 'pid: %d' % util.getpid()
283 hellomsg += 'pid: %d' % util.getpid()
285 if util.safehasattr(os, 'getpgid'):
284 if util.safehasattr(os, 'getpgid'):
286 hellomsg += '\n'
285 hellomsg += '\n'
287 hellomsg += 'pgid: %d' % os.getpgid(0)
286 hellomsg += 'pgid: %d' % os.getpgid(0)
288
287
289 # write the hello msg in -one- chunk
288 # write the hello msg in -one- chunk
290 self.cout.write(hellomsg)
289 self.cout.write(hellomsg)
291
290
292 try:
291 try:
293 while self.serveone():
292 while self.serveone():
294 pass
293 pass
295 except EOFError:
294 except EOFError:
296 # we'll get here if the client disconnected while we were reading
295 # we'll get here if the client disconnected while we were reading
297 # its request
296 # its request
298 return 1
297 return 1
299
298
300 return 0
299 return 0
301
300
302 def _protectio(ui):
301 def _protectio(ui):
303 """ duplicates streams and redirect original to null if ui uses stdio """
302 """ duplicates streams and redirect original to null if ui uses stdio """
304 ui.flush()
303 ui.flush()
305 newfiles = []
304 newfiles = []
306 nullfd = os.open(os.devnull, os.O_RDWR)
305 nullfd = os.open(os.devnull, os.O_RDWR)
307 for f, sysf, mode in [(ui.fin, util.stdin, pycompat.sysstr('rb')),
306 for f, sysf, mode in [(ui.fin, util.stdin, pycompat.sysstr('rb')),
308 (ui.fout, util.stdout, pycompat.sysstr('wb'))]:
307 (ui.fout, util.stdout, pycompat.sysstr('wb'))]:
309 if f is sysf:
308 if f is sysf:
310 newfd = os.dup(f.fileno())
309 newfd = os.dup(f.fileno())
311 os.dup2(nullfd, f.fileno())
310 os.dup2(nullfd, f.fileno())
312 f = os.fdopen(newfd, mode)
311 f = os.fdopen(newfd, mode)
313 newfiles.append(f)
312 newfiles.append(f)
314 os.close(nullfd)
313 os.close(nullfd)
315 return tuple(newfiles)
314 return tuple(newfiles)
316
315
317 def _restoreio(ui, fin, fout):
316 def _restoreio(ui, fin, fout):
318 """ restores streams from duplicated ones """
317 """ restores streams from duplicated ones """
319 ui.flush()
318 ui.flush()
320 for f, uif in [(fin, ui.fin), (fout, ui.fout)]:
319 for f, uif in [(fin, ui.fin), (fout, ui.fout)]:
321 if f is not uif:
320 if f is not uif:
322 os.dup2(f.fileno(), uif.fileno())
321 os.dup2(f.fileno(), uif.fileno())
323 f.close()
322 f.close()
324
323
325 class pipeservice(object):
324 class pipeservice(object):
326 def __init__(self, ui, repo, opts):
325 def __init__(self, ui, repo, opts):
327 self.ui = ui
326 self.ui = ui
328 self.repo = repo
327 self.repo = repo
329
328
330 def init(self):
329 def init(self):
331 pass
330 pass
332
331
333 def run(self):
332 def run(self):
334 ui = self.ui
333 ui = self.ui
335 # redirect stdio to null device so that broken extensions or in-process
334 # redirect stdio to null device so that broken extensions or in-process
336 # hooks will never cause corruption of channel protocol.
335 # hooks will never cause corruption of channel protocol.
337 fin, fout = _protectio(ui)
336 fin, fout = _protectio(ui)
338 try:
337 try:
339 sv = server(ui, self.repo, fin, fout)
338 sv = server(ui, self.repo, fin, fout)
340 return sv.serve()
339 return sv.serve()
341 finally:
340 finally:
342 sv.cleanup()
341 sv.cleanup()
343 _restoreio(ui, fin, fout)
342 _restoreio(ui, fin, fout)
344
343
345 def _initworkerprocess():
344 def _initworkerprocess():
346 # use a different process group from the master process, in order to:
345 # use a different process group from the master process, in order to:
347 # 1. make the current process group no longer "orphaned" (because the
346 # 1. make the current process group no longer "orphaned" (because the
348 # parent of this process is in a different process group while
347 # parent of this process is in a different process group while
349 # remains in a same session)
348 # remains in a same session)
350 # according to POSIX 2.2.2.52, orphaned process group will ignore
349 # according to POSIX 2.2.2.52, orphaned process group will ignore
351 # terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
350 # terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
352 # cause trouble for things like ncurses.
351 # cause trouble for things like ncurses.
353 # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
352 # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
354 # SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
353 # SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
355 # processes like ssh will be killed properly, without affecting
354 # processes like ssh will be killed properly, without affecting
356 # unrelated processes.
355 # unrelated processes.
357 os.setpgid(0, 0)
356 os.setpgid(0, 0)
358 # change random state otherwise forked request handlers would have a
357 # change random state otherwise forked request handlers would have a
359 # same state inherited from parent.
358 # same state inherited from parent.
360 random.seed()
359 random.seed()
361
360
362 def _serverequest(ui, repo, conn, createcmdserver):
361 def _serverequest(ui, repo, conn, createcmdserver):
363 fin = conn.makefile('rb')
362 fin = conn.makefile('rb')
364 fout = conn.makefile('wb')
363 fout = conn.makefile('wb')
365 sv = None
364 sv = None
366 try:
365 try:
367 sv = createcmdserver(repo, conn, fin, fout)
366 sv = createcmdserver(repo, conn, fin, fout)
368 try:
367 try:
369 sv.serve()
368 sv.serve()
370 # handle exceptions that may be raised by command server. most of
369 # handle exceptions that may be raised by command server. most of
371 # known exceptions are caught by dispatch.
370 # known exceptions are caught by dispatch.
372 except error.Abort as inst:
371 except error.Abort as inst:
373 ui.warn(_('abort: %s\n') % inst)
372 ui.warn(_('abort: %s\n') % inst)
374 except IOError as inst:
373 except IOError as inst:
375 if inst.errno != errno.EPIPE:
374 if inst.errno != errno.EPIPE:
376 raise
375 raise
377 except KeyboardInterrupt:
376 except KeyboardInterrupt:
378 pass
377 pass
379 finally:
378 finally:
380 sv.cleanup()
379 sv.cleanup()
381 except: # re-raises
380 except: # re-raises
382 # also write traceback to error channel. otherwise client cannot
381 # also write traceback to error channel. otherwise client cannot
383 # see it because it is written to server's stderr by default.
382 # see it because it is written to server's stderr by default.
384 if sv:
383 if sv:
385 cerr = sv.cerr
384 cerr = sv.cerr
386 else:
385 else:
387 cerr = channeledoutput(fout, 'e')
386 cerr = channeledoutput(fout, 'e')
388 traceback.print_exc(file=cerr)
387 traceback.print_exc(file=cerr)
389 raise
388 raise
390 finally:
389 finally:
391 fin.close()
390 fin.close()
392 try:
391 try:
393 fout.close() # implicit flush() may cause another EPIPE
392 fout.close() # implicit flush() may cause another EPIPE
394 except IOError as inst:
393 except IOError as inst:
395 if inst.errno != errno.EPIPE:
394 if inst.errno != errno.EPIPE:
396 raise
395 raise
397
396
398 class unixservicehandler(object):
397 class unixservicehandler(object):
399 """Set of pluggable operations for unix-mode services
398 """Set of pluggable operations for unix-mode services
400
399
401 Almost all methods except for createcmdserver() are called in the main
400 Almost all methods except for createcmdserver() are called in the main
402 process. You can't pass mutable resource back from createcmdserver().
401 process. You can't pass mutable resource back from createcmdserver().
403 """
402 """
404
403
405 pollinterval = None
404 pollinterval = None
406
405
407 def __init__(self, ui):
406 def __init__(self, ui):
408 self.ui = ui
407 self.ui = ui
409
408
410 def bindsocket(self, sock, address):
409 def bindsocket(self, sock, address):
411 util.bindunixsocket(sock, address)
410 util.bindunixsocket(sock, address)
412 sock.listen(socket.SOMAXCONN)
411 sock.listen(socket.SOMAXCONN)
413 self.ui.status(_('listening at %s\n') % address)
412 self.ui.status(_('listening at %s\n') % address)
414 self.ui.flush() # avoid buffering of status message
413 self.ui.flush() # avoid buffering of status message
415
414
416 def unlinksocket(self, address):
415 def unlinksocket(self, address):
417 os.unlink(address)
416 os.unlink(address)
418
417
419 def shouldexit(self):
418 def shouldexit(self):
420 """True if server should shut down; checked per pollinterval"""
419 """True if server should shut down; checked per pollinterval"""
421 return False
420 return False
422
421
423 def newconnection(self):
422 def newconnection(self):
424 """Called when main process notices new connection"""
423 """Called when main process notices new connection"""
425 pass
426
424
427 def createcmdserver(self, repo, conn, fin, fout):
425 def createcmdserver(self, repo, conn, fin, fout):
428 """Create new command server instance; called in the process that
426 """Create new command server instance; called in the process that
429 serves for the current connection"""
427 serves for the current connection"""
430 return server(self.ui, repo, fin, fout)
428 return server(self.ui, repo, fin, fout)
431
429
432 class unixforkingservice(object):
430 class unixforkingservice(object):
433 """
431 """
434 Listens on unix domain socket and forks server per connection
432 Listens on unix domain socket and forks server per connection
435 """
433 """
436
434
437 def __init__(self, ui, repo, opts, handler=None):
435 def __init__(self, ui, repo, opts, handler=None):
438 self.ui = ui
436 self.ui = ui
439 self.repo = repo
437 self.repo = repo
440 self.address = opts['address']
438 self.address = opts['address']
441 if not util.safehasattr(socket, 'AF_UNIX'):
439 if not util.safehasattr(socket, 'AF_UNIX'):
442 raise error.Abort(_('unsupported platform'))
440 raise error.Abort(_('unsupported platform'))
443 if not self.address:
441 if not self.address:
444 raise error.Abort(_('no socket path specified with --address'))
442 raise error.Abort(_('no socket path specified with --address'))
445 self._servicehandler = handler or unixservicehandler(ui)
443 self._servicehandler = handler or unixservicehandler(ui)
446 self._sock = None
444 self._sock = None
447 self._oldsigchldhandler = None
445 self._oldsigchldhandler = None
448 self._workerpids = set() # updated by signal handler; do not iterate
446 self._workerpids = set() # updated by signal handler; do not iterate
449 self._socketunlinked = None
447 self._socketunlinked = None
450
448
451 def init(self):
449 def init(self):
452 self._sock = socket.socket(socket.AF_UNIX)
450 self._sock = socket.socket(socket.AF_UNIX)
453 self._servicehandler.bindsocket(self._sock, self.address)
451 self._servicehandler.bindsocket(self._sock, self.address)
454 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
452 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
455 self._oldsigchldhandler = o
453 self._oldsigchldhandler = o
456 self._socketunlinked = False
454 self._socketunlinked = False
457
455
458 def _unlinksocket(self):
456 def _unlinksocket(self):
459 if not self._socketunlinked:
457 if not self._socketunlinked:
460 self._servicehandler.unlinksocket(self.address)
458 self._servicehandler.unlinksocket(self.address)
461 self._socketunlinked = True
459 self._socketunlinked = True
462
460
463 def _cleanup(self):
461 def _cleanup(self):
464 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
462 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
465 self._sock.close()
463 self._sock.close()
466 self._unlinksocket()
464 self._unlinksocket()
467 # don't kill child processes as they have active clients, just wait
465 # don't kill child processes as they have active clients, just wait
468 self._reapworkers(0)
466 self._reapworkers(0)
469
467
470 def run(self):
468 def run(self):
471 try:
469 try:
472 self._mainloop()
470 self._mainloop()
473 finally:
471 finally:
474 self._cleanup()
472 self._cleanup()
475
473
476 def _mainloop(self):
474 def _mainloop(self):
477 exiting = False
475 exiting = False
478 h = self._servicehandler
476 h = self._servicehandler
479 selector = selectors2.DefaultSelector()
477 selector = selectors2.DefaultSelector()
480 selector.register(self._sock, selectors2.EVENT_READ)
478 selector.register(self._sock, selectors2.EVENT_READ)
481 while True:
479 while True:
482 if not exiting and h.shouldexit():
480 if not exiting and h.shouldexit():
483 # clients can no longer connect() to the domain socket, so
481 # clients can no longer connect() to the domain socket, so
484 # we stop queuing new requests.
482 # we stop queuing new requests.
485 # for requests that are queued (connect()-ed, but haven't been
483 # for requests that are queued (connect()-ed, but haven't been
486 # accept()-ed), handle them before exit. otherwise, clients
484 # accept()-ed), handle them before exit. otherwise, clients
487 # waiting for recv() will receive ECONNRESET.
485 # waiting for recv() will receive ECONNRESET.
488 self._unlinksocket()
486 self._unlinksocket()
489 exiting = True
487 exiting = True
490 ready = selector.select(timeout=h.pollinterval)
488 ready = selector.select(timeout=h.pollinterval)
491 if not ready:
489 if not ready:
492 # only exit if we completed all queued requests
490 # only exit if we completed all queued requests
493 if exiting:
491 if exiting:
494 break
492 break
495 continue
493 continue
496 try:
494 try:
497 conn, _addr = self._sock.accept()
495 conn, _addr = self._sock.accept()
498 except socket.error as inst:
496 except socket.error as inst:
499 if inst.args[0] == errno.EINTR:
497 if inst.args[0] == errno.EINTR:
500 continue
498 continue
501 raise
499 raise
502
500
503 pid = os.fork()
501 pid = os.fork()
504 if pid:
502 if pid:
505 try:
503 try:
506 self.ui.debug('forked worker process (pid=%d)\n' % pid)
504 self.ui.debug('forked worker process (pid=%d)\n' % pid)
507 self._workerpids.add(pid)
505 self._workerpids.add(pid)
508 h.newconnection()
506 h.newconnection()
509 finally:
507 finally:
510 conn.close() # release handle in parent process
508 conn.close() # release handle in parent process
511 else:
509 else:
512 try:
510 try:
513 self._runworker(conn)
511 self._runworker(conn)
514 conn.close()
512 conn.close()
515 os._exit(0)
513 os._exit(0)
516 except: # never return, hence no re-raises
514 except: # never return, hence no re-raises
517 try:
515 try:
518 self.ui.traceback(force=True)
516 self.ui.traceback(force=True)
519 finally:
517 finally:
520 os._exit(255)
518 os._exit(255)
521 selector.close()
519 selector.close()
522
520
523 def _sigchldhandler(self, signal, frame):
521 def _sigchldhandler(self, signal, frame):
524 self._reapworkers(os.WNOHANG)
522 self._reapworkers(os.WNOHANG)
525
523
526 def _reapworkers(self, options):
524 def _reapworkers(self, options):
527 while self._workerpids:
525 while self._workerpids:
528 try:
526 try:
529 pid, _status = os.waitpid(-1, options)
527 pid, _status = os.waitpid(-1, options)
530 except OSError as inst:
528 except OSError as inst:
531 if inst.errno == errno.EINTR:
529 if inst.errno == errno.EINTR:
532 continue
530 continue
533 if inst.errno != errno.ECHILD:
531 if inst.errno != errno.ECHILD:
534 raise
532 raise
535 # no child processes at all (reaped by other waitpid()?)
533 # no child processes at all (reaped by other waitpid()?)
536 self._workerpids.clear()
534 self._workerpids.clear()
537 return
535 return
538 if pid == 0:
536 if pid == 0:
539 # no waitable child processes
537 # no waitable child processes
540 return
538 return
541 self.ui.debug('worker process exited (pid=%d)\n' % pid)
539 self.ui.debug('worker process exited (pid=%d)\n' % pid)
542 self._workerpids.discard(pid)
540 self._workerpids.discard(pid)
543
541
544 def _runworker(self, conn):
542 def _runworker(self, conn):
545 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
543 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
546 _initworkerprocess()
544 _initworkerprocess()
547 h = self._servicehandler
545 h = self._servicehandler
548 try:
546 try:
549 _serverequest(self.ui, self.repo, conn, h.createcmdserver)
547 _serverequest(self.ui, self.repo, conn, h.createcmdserver)
550 finally:
548 finally:
551 gc.collect() # trigger __del__ since worker process uses os._exit
549 gc.collect() # trigger __del__ since worker process uses os._exit
General Comments 0
You need to be logged in to leave comments. Login now