##// END OF EJS Templates
darwin: add another preemptive gui() call when using chg...
Kyle Lippincott -
r44850:6392bd7c default
parent child Browse files
Show More
@@ -1,727 +1,731 b''
1 # commandserver.py - communicate with Mercurial's API over a pipe
1 # commandserver.py - communicate with Mercurial's API over a pipe
2 #
2 #
3 # Copyright Matt Mackall <mpm@selenic.com>
3 # Copyright Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import gc
11 import gc
12 import os
12 import os
13 import random
13 import random
14 import signal
14 import signal
15 import socket
15 import socket
16 import struct
16 import struct
17 import traceback
17 import traceback
18
18
19 try:
19 try:
20 import selectors
20 import selectors
21
21
22 selectors.BaseSelector
22 selectors.BaseSelector
23 except ImportError:
23 except ImportError:
24 from .thirdparty import selectors2 as selectors
24 from .thirdparty import selectors2 as selectors
25
25
26 from .i18n import _
26 from .i18n import _
27 from .pycompat import getattr
27 from .pycompat import getattr
28 from . import (
28 from . import (
29 encoding,
29 encoding,
30 error,
30 error,
31 loggingutil,
31 loggingutil,
32 pycompat,
32 pycompat,
33 repocache,
33 repocache,
34 util,
34 util,
35 vfs as vfsmod,
35 vfs as vfsmod,
36 )
36 )
37 from .utils import (
37 from .utils import (
38 cborutil,
38 cborutil,
39 procutil,
39 procutil,
40 )
40 )
41
41
42
42
43 class channeledoutput(object):
43 class channeledoutput(object):
44 """
44 """
45 Write data to out in the following format:
45 Write data to out in the following format:
46
46
47 data length (unsigned int),
47 data length (unsigned int),
48 data
48 data
49 """
49 """
50
50
51 def __init__(self, out, channel):
51 def __init__(self, out, channel):
52 self.out = out
52 self.out = out
53 self.channel = channel
53 self.channel = channel
54
54
55 @property
55 @property
56 def name(self):
56 def name(self):
57 return b'<%c-channel>' % self.channel
57 return b'<%c-channel>' % self.channel
58
58
59 def write(self, data):
59 def write(self, data):
60 if not data:
60 if not data:
61 return
61 return
62 # single write() to guarantee the same atomicity as the underlying file
62 # single write() to guarantee the same atomicity as the underlying file
63 self.out.write(struct.pack(b'>cI', self.channel, len(data)) + data)
63 self.out.write(struct.pack(b'>cI', self.channel, len(data)) + data)
64 self.out.flush()
64 self.out.flush()
65
65
66 def __getattr__(self, attr):
66 def __getattr__(self, attr):
67 if attr in ('isatty', 'fileno', 'tell', 'seek'):
67 if attr in ('isatty', 'fileno', 'tell', 'seek'):
68 raise AttributeError(attr)
68 raise AttributeError(attr)
69 return getattr(self.out, attr)
69 return getattr(self.out, attr)
70
70
71
71
72 class channeledmessage(object):
72 class channeledmessage(object):
73 """
73 """
74 Write encoded message and metadata to out in the following format:
74 Write encoded message and metadata to out in the following format:
75
75
76 data length (unsigned int),
76 data length (unsigned int),
77 encoded message and metadata, as a flat key-value dict.
77 encoded message and metadata, as a flat key-value dict.
78
78
79 Each message should have 'type' attribute. Messages of unknown type
79 Each message should have 'type' attribute. Messages of unknown type
80 should be ignored.
80 should be ignored.
81 """
81 """
82
82
83 # teach ui that write() can take **opts
83 # teach ui that write() can take **opts
84 structured = True
84 structured = True
85
85
86 def __init__(self, out, channel, encodename, encodefn):
86 def __init__(self, out, channel, encodename, encodefn):
87 self._cout = channeledoutput(out, channel)
87 self._cout = channeledoutput(out, channel)
88 self.encoding = encodename
88 self.encoding = encodename
89 self._encodefn = encodefn
89 self._encodefn = encodefn
90
90
91 def write(self, data, **opts):
91 def write(self, data, **opts):
92 opts = pycompat.byteskwargs(opts)
92 opts = pycompat.byteskwargs(opts)
93 if data is not None:
93 if data is not None:
94 opts[b'data'] = data
94 opts[b'data'] = data
95 self._cout.write(self._encodefn(opts))
95 self._cout.write(self._encodefn(opts))
96
96
97 def __getattr__(self, attr):
97 def __getattr__(self, attr):
98 return getattr(self._cout, attr)
98 return getattr(self._cout, attr)
99
99
100
100
101 class channeledinput(object):
101 class channeledinput(object):
102 """
102 """
103 Read data from in_.
103 Read data from in_.
104
104
105 Requests for input are written to out in the following format:
105 Requests for input are written to out in the following format:
106 channel identifier - 'I' for plain input, 'L' line based (1 byte)
106 channel identifier - 'I' for plain input, 'L' line based (1 byte)
107 how many bytes to send at most (unsigned int),
107 how many bytes to send at most (unsigned int),
108
108
109 The client replies with:
109 The client replies with:
110 data length (unsigned int), 0 meaning EOF
110 data length (unsigned int), 0 meaning EOF
111 data
111 data
112 """
112 """
113
113
114 maxchunksize = 4 * 1024
114 maxchunksize = 4 * 1024
115
115
116 def __init__(self, in_, out, channel):
116 def __init__(self, in_, out, channel):
117 self.in_ = in_
117 self.in_ = in_
118 self.out = out
118 self.out = out
119 self.channel = channel
119 self.channel = channel
120
120
121 @property
121 @property
122 def name(self):
122 def name(self):
123 return b'<%c-channel>' % self.channel
123 return b'<%c-channel>' % self.channel
124
124
125 def read(self, size=-1):
125 def read(self, size=-1):
126 if size < 0:
126 if size < 0:
127 # if we need to consume all the clients input, ask for 4k chunks
127 # if we need to consume all the clients input, ask for 4k chunks
128 # so the pipe doesn't fill up risking a deadlock
128 # so the pipe doesn't fill up risking a deadlock
129 size = self.maxchunksize
129 size = self.maxchunksize
130 s = self._read(size, self.channel)
130 s = self._read(size, self.channel)
131 buf = s
131 buf = s
132 while s:
132 while s:
133 s = self._read(size, self.channel)
133 s = self._read(size, self.channel)
134 buf += s
134 buf += s
135
135
136 return buf
136 return buf
137 else:
137 else:
138 return self._read(size, self.channel)
138 return self._read(size, self.channel)
139
139
140 def _read(self, size, channel):
140 def _read(self, size, channel):
141 if not size:
141 if not size:
142 return b''
142 return b''
143 assert size > 0
143 assert size > 0
144
144
145 # tell the client we need at most size bytes
145 # tell the client we need at most size bytes
146 self.out.write(struct.pack(b'>cI', channel, size))
146 self.out.write(struct.pack(b'>cI', channel, size))
147 self.out.flush()
147 self.out.flush()
148
148
149 length = self.in_.read(4)
149 length = self.in_.read(4)
150 length = struct.unpack(b'>I', length)[0]
150 length = struct.unpack(b'>I', length)[0]
151 if not length:
151 if not length:
152 return b''
152 return b''
153 else:
153 else:
154 return self.in_.read(length)
154 return self.in_.read(length)
155
155
156 def readline(self, size=-1):
156 def readline(self, size=-1):
157 if size < 0:
157 if size < 0:
158 size = self.maxchunksize
158 size = self.maxchunksize
159 s = self._read(size, b'L')
159 s = self._read(size, b'L')
160 buf = s
160 buf = s
161 # keep asking for more until there's either no more or
161 # keep asking for more until there's either no more or
162 # we got a full line
162 # we got a full line
163 while s and s[-1] != b'\n':
163 while s and s[-1] != b'\n':
164 s = self._read(size, b'L')
164 s = self._read(size, b'L')
165 buf += s
165 buf += s
166
166
167 return buf
167 return buf
168 else:
168 else:
169 return self._read(size, b'L')
169 return self._read(size, b'L')
170
170
171 def __iter__(self):
171 def __iter__(self):
172 return self
172 return self
173
173
174 def next(self):
174 def next(self):
175 l = self.readline()
175 l = self.readline()
176 if not l:
176 if not l:
177 raise StopIteration
177 raise StopIteration
178 return l
178 return l
179
179
180 __next__ = next
180 __next__ = next
181
181
182 def __getattr__(self, attr):
182 def __getattr__(self, attr):
183 if attr in ('isatty', 'fileno', 'tell', 'seek'):
183 if attr in ('isatty', 'fileno', 'tell', 'seek'):
184 raise AttributeError(attr)
184 raise AttributeError(attr)
185 return getattr(self.in_, attr)
185 return getattr(self.in_, attr)
186
186
187
187
188 _messageencoders = {
188 _messageencoders = {
189 b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
189 b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
190 }
190 }
191
191
192
192
193 def _selectmessageencoder(ui):
193 def _selectmessageencoder(ui):
194 # experimental config: cmdserver.message-encodings
194 # experimental config: cmdserver.message-encodings
195 encnames = ui.configlist(b'cmdserver', b'message-encodings')
195 encnames = ui.configlist(b'cmdserver', b'message-encodings')
196 for n in encnames:
196 for n in encnames:
197 f = _messageencoders.get(n)
197 f = _messageencoders.get(n)
198 if f:
198 if f:
199 return n, f
199 return n, f
200 raise error.Abort(
200 raise error.Abort(
201 b'no supported message encodings: %s' % b' '.join(encnames)
201 b'no supported message encodings: %s' % b' '.join(encnames)
202 )
202 )
203
203
204
204
205 class server(object):
205 class server(object):
206 """
206 """
207 Listens for commands on fin, runs them and writes the output on a channel
207 Listens for commands on fin, runs them and writes the output on a channel
208 based stream to fout.
208 based stream to fout.
209 """
209 """
210
210
211 def __init__(self, ui, repo, fin, fout, prereposetups=None):
211 def __init__(self, ui, repo, fin, fout, prereposetups=None):
212 self.cwd = encoding.getcwd()
212 self.cwd = encoding.getcwd()
213
213
214 if repo:
214 if repo:
215 # the ui here is really the repo ui so take its baseui so we don't
215 # the ui here is really the repo ui so take its baseui so we don't
216 # end up with its local configuration
216 # end up with its local configuration
217 self.ui = repo.baseui
217 self.ui = repo.baseui
218 self.repo = repo
218 self.repo = repo
219 self.repoui = repo.ui
219 self.repoui = repo.ui
220 else:
220 else:
221 self.ui = ui
221 self.ui = ui
222 self.repo = self.repoui = None
222 self.repo = self.repoui = None
223 self._prereposetups = prereposetups
223 self._prereposetups = prereposetups
224
224
225 self.cdebug = channeledoutput(fout, b'd')
225 self.cdebug = channeledoutput(fout, b'd')
226 self.cerr = channeledoutput(fout, b'e')
226 self.cerr = channeledoutput(fout, b'e')
227 self.cout = channeledoutput(fout, b'o')
227 self.cout = channeledoutput(fout, b'o')
228 self.cin = channeledinput(fin, fout, b'I')
228 self.cin = channeledinput(fin, fout, b'I')
229 self.cresult = channeledoutput(fout, b'r')
229 self.cresult = channeledoutput(fout, b'r')
230
230
231 if self.ui.config(b'cmdserver', b'log') == b'-':
231 if self.ui.config(b'cmdserver', b'log') == b'-':
232 # switch log stream of server's ui to the 'd' (debug) channel
232 # switch log stream of server's ui to the 'd' (debug) channel
233 # (don't touch repo.ui as its lifetime is longer than the server)
233 # (don't touch repo.ui as its lifetime is longer than the server)
234 self.ui = self.ui.copy()
234 self.ui = self.ui.copy()
235 setuplogging(self.ui, repo=None, fp=self.cdebug)
235 setuplogging(self.ui, repo=None, fp=self.cdebug)
236
236
237 # TODO: add this to help/config.txt when stabilized
237 # TODO: add this to help/config.txt when stabilized
238 # ``channel``
238 # ``channel``
239 # Use separate channel for structured output. (Command-server only)
239 # Use separate channel for structured output. (Command-server only)
240 self.cmsg = None
240 self.cmsg = None
241 if ui.config(b'ui', b'message-output') == b'channel':
241 if ui.config(b'ui', b'message-output') == b'channel':
242 encname, encfn = _selectmessageencoder(ui)
242 encname, encfn = _selectmessageencoder(ui)
243 self.cmsg = channeledmessage(fout, b'm', encname, encfn)
243 self.cmsg = channeledmessage(fout, b'm', encname, encfn)
244
244
245 self.client = fin
245 self.client = fin
246
246
247 def cleanup(self):
247 def cleanup(self):
248 """release and restore resources taken during server session"""
248 """release and restore resources taken during server session"""
249
249
250 def _read(self, size):
250 def _read(self, size):
251 if not size:
251 if not size:
252 return b''
252 return b''
253
253
254 data = self.client.read(size)
254 data = self.client.read(size)
255
255
256 # is the other end closed?
256 # is the other end closed?
257 if not data:
257 if not data:
258 raise EOFError
258 raise EOFError
259
259
260 return data
260 return data
261
261
262 def _readstr(self):
262 def _readstr(self):
263 """read a string from the channel
263 """read a string from the channel
264
264
265 format:
265 format:
266 data length (uint32), data
266 data length (uint32), data
267 """
267 """
268 length = struct.unpack(b'>I', self._read(4))[0]
268 length = struct.unpack(b'>I', self._read(4))[0]
269 if not length:
269 if not length:
270 return b''
270 return b''
271 return self._read(length)
271 return self._read(length)
272
272
273 def _readlist(self):
273 def _readlist(self):
274 """read a list of NULL separated strings from the channel"""
274 """read a list of NULL separated strings from the channel"""
275 s = self._readstr()
275 s = self._readstr()
276 if s:
276 if s:
277 return s.split(b'\0')
277 return s.split(b'\0')
278 else:
278 else:
279 return []
279 return []
280
280
281 def runcommand(self):
281 def runcommand(self):
282 """ reads a list of \0 terminated arguments, executes
282 """ reads a list of \0 terminated arguments, executes
283 and writes the return code to the result channel """
283 and writes the return code to the result channel """
284 from . import dispatch # avoid cycle
284 from . import dispatch # avoid cycle
285
285
286 args = self._readlist()
286 args = self._readlist()
287
287
288 # copy the uis so changes (e.g. --config or --verbose) don't
288 # copy the uis so changes (e.g. --config or --verbose) don't
289 # persist between requests
289 # persist between requests
290 copiedui = self.ui.copy()
290 copiedui = self.ui.copy()
291 uis = [copiedui]
291 uis = [copiedui]
292 if self.repo:
292 if self.repo:
293 self.repo.baseui = copiedui
293 self.repo.baseui = copiedui
294 # clone ui without using ui.copy because this is protected
294 # clone ui without using ui.copy because this is protected
295 repoui = self.repoui.__class__(self.repoui)
295 repoui = self.repoui.__class__(self.repoui)
296 repoui.copy = copiedui.copy # redo copy protection
296 repoui.copy = copiedui.copy # redo copy protection
297 uis.append(repoui)
297 uis.append(repoui)
298 self.repo.ui = self.repo.dirstate._ui = repoui
298 self.repo.ui = self.repo.dirstate._ui = repoui
299 self.repo.invalidateall()
299 self.repo.invalidateall()
300
300
301 for ui in uis:
301 for ui in uis:
302 ui.resetstate()
302 ui.resetstate()
303 # any kind of interaction must use server channels, but chg may
303 # any kind of interaction must use server channels, but chg may
304 # replace channels by fully functional tty files. so nontty is
304 # replace channels by fully functional tty files. so nontty is
305 # enforced only if cin is a channel.
305 # enforced only if cin is a channel.
306 if not util.safehasattr(self.cin, b'fileno'):
306 if not util.safehasattr(self.cin, b'fileno'):
307 ui.setconfig(b'ui', b'nontty', b'true', b'commandserver')
307 ui.setconfig(b'ui', b'nontty', b'true', b'commandserver')
308
308
309 req = dispatch.request(
309 req = dispatch.request(
310 args[:],
310 args[:],
311 copiedui,
311 copiedui,
312 self.repo,
312 self.repo,
313 self.cin,
313 self.cin,
314 self.cout,
314 self.cout,
315 self.cerr,
315 self.cerr,
316 self.cmsg,
316 self.cmsg,
317 prereposetups=self._prereposetups,
317 prereposetups=self._prereposetups,
318 )
318 )
319
319
320 try:
320 try:
321 ret = dispatch.dispatch(req) & 255
321 ret = dispatch.dispatch(req) & 255
322 self.cresult.write(struct.pack(b'>i', int(ret)))
322 self.cresult.write(struct.pack(b'>i', int(ret)))
323 finally:
323 finally:
324 # restore old cwd
324 # restore old cwd
325 if b'--cwd' in args:
325 if b'--cwd' in args:
326 os.chdir(self.cwd)
326 os.chdir(self.cwd)
327
327
328 def getencoding(self):
328 def getencoding(self):
329 """ writes the current encoding to the result channel """
329 """ writes the current encoding to the result channel """
330 self.cresult.write(encoding.encoding)
330 self.cresult.write(encoding.encoding)
331
331
332 def serveone(self):
332 def serveone(self):
333 cmd = self.client.readline()[:-1]
333 cmd = self.client.readline()[:-1]
334 if cmd:
334 if cmd:
335 handler = self.capabilities.get(cmd)
335 handler = self.capabilities.get(cmd)
336 if handler:
336 if handler:
337 handler(self)
337 handler(self)
338 else:
338 else:
339 # clients are expected to check what commands are supported by
339 # clients are expected to check what commands are supported by
340 # looking at the servers capabilities
340 # looking at the servers capabilities
341 raise error.Abort(_(b'unknown command %s') % cmd)
341 raise error.Abort(_(b'unknown command %s') % cmd)
342
342
343 return cmd != b''
343 return cmd != b''
344
344
345 capabilities = {b'runcommand': runcommand, b'getencoding': getencoding}
345 capabilities = {b'runcommand': runcommand, b'getencoding': getencoding}
346
346
347 def serve(self):
347 def serve(self):
348 hellomsg = b'capabilities: ' + b' '.join(sorted(self.capabilities))
348 hellomsg = b'capabilities: ' + b' '.join(sorted(self.capabilities))
349 hellomsg += b'\n'
349 hellomsg += b'\n'
350 hellomsg += b'encoding: ' + encoding.encoding
350 hellomsg += b'encoding: ' + encoding.encoding
351 hellomsg += b'\n'
351 hellomsg += b'\n'
352 if self.cmsg:
352 if self.cmsg:
353 hellomsg += b'message-encoding: %s\n' % self.cmsg.encoding
353 hellomsg += b'message-encoding: %s\n' % self.cmsg.encoding
354 hellomsg += b'pid: %d' % procutil.getpid()
354 hellomsg += b'pid: %d' % procutil.getpid()
355 if util.safehasattr(os, b'getpgid'):
355 if util.safehasattr(os, b'getpgid'):
356 hellomsg += b'\n'
356 hellomsg += b'\n'
357 hellomsg += b'pgid: %d' % os.getpgid(0)
357 hellomsg += b'pgid: %d' % os.getpgid(0)
358
358
359 # write the hello msg in -one- chunk
359 # write the hello msg in -one- chunk
360 self.cout.write(hellomsg)
360 self.cout.write(hellomsg)
361
361
362 try:
362 try:
363 while self.serveone():
363 while self.serveone():
364 pass
364 pass
365 except EOFError:
365 except EOFError:
366 # we'll get here if the client disconnected while we were reading
366 # we'll get here if the client disconnected while we were reading
367 # its request
367 # its request
368 return 1
368 return 1
369
369
370 return 0
370 return 0
371
371
372
372
373 def setuplogging(ui, repo=None, fp=None):
373 def setuplogging(ui, repo=None, fp=None):
374 """Set up server logging facility
374 """Set up server logging facility
375
375
376 If cmdserver.log is '-', log messages will be sent to the given fp.
376 If cmdserver.log is '-', log messages will be sent to the given fp.
377 It should be the 'd' channel while a client is connected, and otherwise
377 It should be the 'd' channel while a client is connected, and otherwise
378 is the stderr of the server process.
378 is the stderr of the server process.
379 """
379 """
380 # developer config: cmdserver.log
380 # developer config: cmdserver.log
381 logpath = ui.config(b'cmdserver', b'log')
381 logpath = ui.config(b'cmdserver', b'log')
382 if not logpath:
382 if not logpath:
383 return
383 return
384 # developer config: cmdserver.track-log
384 # developer config: cmdserver.track-log
385 tracked = set(ui.configlist(b'cmdserver', b'track-log'))
385 tracked = set(ui.configlist(b'cmdserver', b'track-log'))
386
386
387 if logpath == b'-' and fp:
387 if logpath == b'-' and fp:
388 logger = loggingutil.fileobjectlogger(fp, tracked)
388 logger = loggingutil.fileobjectlogger(fp, tracked)
389 elif logpath == b'-':
389 elif logpath == b'-':
390 logger = loggingutil.fileobjectlogger(ui.ferr, tracked)
390 logger = loggingutil.fileobjectlogger(ui.ferr, tracked)
391 else:
391 else:
392 logpath = os.path.abspath(util.expandpath(logpath))
392 logpath = os.path.abspath(util.expandpath(logpath))
393 # developer config: cmdserver.max-log-files
393 # developer config: cmdserver.max-log-files
394 maxfiles = ui.configint(b'cmdserver', b'max-log-files')
394 maxfiles = ui.configint(b'cmdserver', b'max-log-files')
395 # developer config: cmdserver.max-log-size
395 # developer config: cmdserver.max-log-size
396 maxsize = ui.configbytes(b'cmdserver', b'max-log-size')
396 maxsize = ui.configbytes(b'cmdserver', b'max-log-size')
397 vfs = vfsmod.vfs(os.path.dirname(logpath))
397 vfs = vfsmod.vfs(os.path.dirname(logpath))
398 logger = loggingutil.filelogger(
398 logger = loggingutil.filelogger(
399 vfs,
399 vfs,
400 os.path.basename(logpath),
400 os.path.basename(logpath),
401 tracked,
401 tracked,
402 maxfiles=maxfiles,
402 maxfiles=maxfiles,
403 maxsize=maxsize,
403 maxsize=maxsize,
404 )
404 )
405
405
406 targetuis = {ui}
406 targetuis = {ui}
407 if repo:
407 if repo:
408 targetuis.add(repo.baseui)
408 targetuis.add(repo.baseui)
409 targetuis.add(repo.ui)
409 targetuis.add(repo.ui)
410 for u in targetuis:
410 for u in targetuis:
411 u.setlogger(b'cmdserver', logger)
411 u.setlogger(b'cmdserver', logger)
412
412
413
413
414 class pipeservice(object):
414 class pipeservice(object):
415 def __init__(self, ui, repo, opts):
415 def __init__(self, ui, repo, opts):
416 self.ui = ui
416 self.ui = ui
417 self.repo = repo
417 self.repo = repo
418
418
419 def init(self):
419 def init(self):
420 pass
420 pass
421
421
422 def run(self):
422 def run(self):
423 ui = self.ui
423 ui = self.ui
424 # redirect stdio to null device so that broken extensions or in-process
424 # redirect stdio to null device so that broken extensions or in-process
425 # hooks will never cause corruption of channel protocol.
425 # hooks will never cause corruption of channel protocol.
426 with ui.protectedfinout() as (fin, fout):
426 with ui.protectedfinout() as (fin, fout):
427 sv = server(ui, self.repo, fin, fout)
427 sv = server(ui, self.repo, fin, fout)
428 try:
428 try:
429 return sv.serve()
429 return sv.serve()
430 finally:
430 finally:
431 sv.cleanup()
431 sv.cleanup()
432
432
433
433
434 def _initworkerprocess():
434 def _initworkerprocess():
435 # use a different process group from the master process, in order to:
435 # use a different process group from the master process, in order to:
436 # 1. make the current process group no longer "orphaned" (because the
436 # 1. make the current process group no longer "orphaned" (because the
437 # parent of this process is in a different process group while
437 # parent of this process is in a different process group while
438 # remains in a same session)
438 # remains in a same session)
439 # according to POSIX 2.2.2.52, orphaned process group will ignore
439 # according to POSIX 2.2.2.52, orphaned process group will ignore
440 # terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
440 # terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
441 # cause trouble for things like ncurses.
441 # cause trouble for things like ncurses.
442 # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
442 # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
443 # SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
443 # SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
444 # processes like ssh will be killed properly, without affecting
444 # processes like ssh will be killed properly, without affecting
445 # unrelated processes.
445 # unrelated processes.
446 os.setpgid(0, 0)
446 os.setpgid(0, 0)
447 # change random state otherwise forked request handlers would have a
447 # change random state otherwise forked request handlers would have a
448 # same state inherited from parent.
448 # same state inherited from parent.
449 random.seed()
449 random.seed()
450
450
451
451
452 def _serverequest(ui, repo, conn, createcmdserver, prereposetups):
452 def _serverequest(ui, repo, conn, createcmdserver, prereposetups):
453 fin = conn.makefile('rb')
453 fin = conn.makefile('rb')
454 fout = conn.makefile('wb')
454 fout = conn.makefile('wb')
455 sv = None
455 sv = None
456 try:
456 try:
457 sv = createcmdserver(repo, conn, fin, fout, prereposetups)
457 sv = createcmdserver(repo, conn, fin, fout, prereposetups)
458 try:
458 try:
459 sv.serve()
459 sv.serve()
460 # handle exceptions that may be raised by command server. most of
460 # handle exceptions that may be raised by command server. most of
461 # known exceptions are caught by dispatch.
461 # known exceptions are caught by dispatch.
462 except error.Abort as inst:
462 except error.Abort as inst:
463 ui.error(_(b'abort: %s\n') % inst)
463 ui.error(_(b'abort: %s\n') % inst)
464 except IOError as inst:
464 except IOError as inst:
465 if inst.errno != errno.EPIPE:
465 if inst.errno != errno.EPIPE:
466 raise
466 raise
467 except KeyboardInterrupt:
467 except KeyboardInterrupt:
468 pass
468 pass
469 finally:
469 finally:
470 sv.cleanup()
470 sv.cleanup()
471 except: # re-raises
471 except: # re-raises
472 # also write traceback to error channel. otherwise client cannot
472 # also write traceback to error channel. otherwise client cannot
473 # see it because it is written to server's stderr by default.
473 # see it because it is written to server's stderr by default.
474 if sv:
474 if sv:
475 cerr = sv.cerr
475 cerr = sv.cerr
476 else:
476 else:
477 cerr = channeledoutput(fout, b'e')
477 cerr = channeledoutput(fout, b'e')
478 cerr.write(encoding.strtolocal(traceback.format_exc()))
478 cerr.write(encoding.strtolocal(traceback.format_exc()))
479 raise
479 raise
480 finally:
480 finally:
481 fin.close()
481 fin.close()
482 try:
482 try:
483 fout.close() # implicit flush() may cause another EPIPE
483 fout.close() # implicit flush() may cause another EPIPE
484 except IOError as inst:
484 except IOError as inst:
485 if inst.errno != errno.EPIPE:
485 if inst.errno != errno.EPIPE:
486 raise
486 raise
487
487
488
488
489 class unixservicehandler(object):
489 class unixservicehandler(object):
490 """Set of pluggable operations for unix-mode services
490 """Set of pluggable operations for unix-mode services
491
491
492 Almost all methods except for createcmdserver() are called in the main
492 Almost all methods except for createcmdserver() are called in the main
493 process. You can't pass mutable resource back from createcmdserver().
493 process. You can't pass mutable resource back from createcmdserver().
494 """
494 """
495
495
496 pollinterval = None
496 pollinterval = None
497
497
498 def __init__(self, ui):
498 def __init__(self, ui):
499 self.ui = ui
499 self.ui = ui
500
500
501 def bindsocket(self, sock, address):
501 def bindsocket(self, sock, address):
502 util.bindunixsocket(sock, address)
502 util.bindunixsocket(sock, address)
503 sock.listen(socket.SOMAXCONN)
503 sock.listen(socket.SOMAXCONN)
504 self.ui.status(_(b'listening at %s\n') % address)
504 self.ui.status(_(b'listening at %s\n') % address)
505 self.ui.flush() # avoid buffering of status message
505 self.ui.flush() # avoid buffering of status message
506
506
507 def unlinksocket(self, address):
507 def unlinksocket(self, address):
508 os.unlink(address)
508 os.unlink(address)
509
509
510 def shouldexit(self):
510 def shouldexit(self):
511 """True if server should shut down; checked per pollinterval"""
511 """True if server should shut down; checked per pollinterval"""
512 return False
512 return False
513
513
514 def newconnection(self):
514 def newconnection(self):
515 """Called when main process notices new connection"""
515 """Called when main process notices new connection"""
516
516
517 def createcmdserver(self, repo, conn, fin, fout, prereposetups):
517 def createcmdserver(self, repo, conn, fin, fout, prereposetups):
518 """Create new command server instance; called in the process that
518 """Create new command server instance; called in the process that
519 serves for the current connection"""
519 serves for the current connection"""
520 return server(self.ui, repo, fin, fout, prereposetups)
520 return server(self.ui, repo, fin, fout, prereposetups)
521
521
522
522
523 class unixforkingservice(object):
523 class unixforkingservice(object):
524 """
524 """
525 Listens on unix domain socket and forks server per connection
525 Listens on unix domain socket and forks server per connection
526 """
526 """
527
527
528 def __init__(self, ui, repo, opts, handler=None):
528 def __init__(self, ui, repo, opts, handler=None):
529 self.ui = ui
529 self.ui = ui
530 self.repo = repo
530 self.repo = repo
531 self.address = opts[b'address']
531 self.address = opts[b'address']
532 if not util.safehasattr(socket, b'AF_UNIX'):
532 if not util.safehasattr(socket, b'AF_UNIX'):
533 raise error.Abort(_(b'unsupported platform'))
533 raise error.Abort(_(b'unsupported platform'))
534 if not self.address:
534 if not self.address:
535 raise error.Abort(_(b'no socket path specified with --address'))
535 raise error.Abort(_(b'no socket path specified with --address'))
536 self._servicehandler = handler or unixservicehandler(ui)
536 self._servicehandler = handler or unixservicehandler(ui)
537 self._sock = None
537 self._sock = None
538 self._mainipc = None
538 self._mainipc = None
539 self._workeripc = None
539 self._workeripc = None
540 self._oldsigchldhandler = None
540 self._oldsigchldhandler = None
541 self._workerpids = set() # updated by signal handler; do not iterate
541 self._workerpids = set() # updated by signal handler; do not iterate
542 self._socketunlinked = None
542 self._socketunlinked = None
543 # experimental config: cmdserver.max-repo-cache
543 # experimental config: cmdserver.max-repo-cache
544 maxlen = ui.configint(b'cmdserver', b'max-repo-cache')
544 maxlen = ui.configint(b'cmdserver', b'max-repo-cache')
545 if maxlen < 0:
545 if maxlen < 0:
546 raise error.Abort(_(b'negative max-repo-cache size not allowed'))
546 raise error.Abort(_(b'negative max-repo-cache size not allowed'))
547 self._repoloader = repocache.repoloader(ui, maxlen)
547 self._repoloader = repocache.repoloader(ui, maxlen)
548 # attempt to avoid crash in CoreFoundation when using chg after fix in
549 # a89381e04c58
550 if pycompat.isdarwin:
551 procutil.gui()
548
552
549 def init(self):
553 def init(self):
550 self._sock = socket.socket(socket.AF_UNIX)
554 self._sock = socket.socket(socket.AF_UNIX)
551 # IPC channel from many workers to one main process; this is actually
555 # IPC channel from many workers to one main process; this is actually
552 # a uni-directional pipe, but is backed by a DGRAM socket so each
556 # a uni-directional pipe, but is backed by a DGRAM socket so each
553 # message can be easily separated.
557 # message can be easily separated.
554 o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
558 o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
555 self._mainipc, self._workeripc = o
559 self._mainipc, self._workeripc = o
556 self._servicehandler.bindsocket(self._sock, self.address)
560 self._servicehandler.bindsocket(self._sock, self.address)
557 if util.safehasattr(procutil, b'unblocksignal'):
561 if util.safehasattr(procutil, b'unblocksignal'):
558 procutil.unblocksignal(signal.SIGCHLD)
562 procutil.unblocksignal(signal.SIGCHLD)
559 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
563 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
560 self._oldsigchldhandler = o
564 self._oldsigchldhandler = o
561 self._socketunlinked = False
565 self._socketunlinked = False
562 self._repoloader.start()
566 self._repoloader.start()
563
567
564 def _unlinksocket(self):
568 def _unlinksocket(self):
565 if not self._socketunlinked:
569 if not self._socketunlinked:
566 self._servicehandler.unlinksocket(self.address)
570 self._servicehandler.unlinksocket(self.address)
567 self._socketunlinked = True
571 self._socketunlinked = True
568
572
569 def _cleanup(self):
573 def _cleanup(self):
570 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
574 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
571 self._sock.close()
575 self._sock.close()
572 self._mainipc.close()
576 self._mainipc.close()
573 self._workeripc.close()
577 self._workeripc.close()
574 self._unlinksocket()
578 self._unlinksocket()
575 self._repoloader.stop()
579 self._repoloader.stop()
576 # don't kill child processes as they have active clients, just wait
580 # don't kill child processes as they have active clients, just wait
577 self._reapworkers(0)
581 self._reapworkers(0)
578
582
579 def run(self):
583 def run(self):
580 try:
584 try:
581 self._mainloop()
585 self._mainloop()
582 finally:
586 finally:
583 self._cleanup()
587 self._cleanup()
584
588
585 def _mainloop(self):
589 def _mainloop(self):
586 exiting = False
590 exiting = False
587 h = self._servicehandler
591 h = self._servicehandler
588 selector = selectors.DefaultSelector()
592 selector = selectors.DefaultSelector()
589 selector.register(
593 selector.register(
590 self._sock, selectors.EVENT_READ, self._acceptnewconnection
594 self._sock, selectors.EVENT_READ, self._acceptnewconnection
591 )
595 )
592 selector.register(
596 selector.register(
593 self._mainipc, selectors.EVENT_READ, self._handlemainipc
597 self._mainipc, selectors.EVENT_READ, self._handlemainipc
594 )
598 )
595 while True:
599 while True:
596 if not exiting and h.shouldexit():
600 if not exiting and h.shouldexit():
597 # clients can no longer connect() to the domain socket, so
601 # clients can no longer connect() to the domain socket, so
598 # we stop queuing new requests.
602 # we stop queuing new requests.
599 # for requests that are queued (connect()-ed, but haven't been
603 # for requests that are queued (connect()-ed, but haven't been
600 # accept()-ed), handle them before exit. otherwise, clients
604 # accept()-ed), handle them before exit. otherwise, clients
601 # waiting for recv() will receive ECONNRESET.
605 # waiting for recv() will receive ECONNRESET.
602 self._unlinksocket()
606 self._unlinksocket()
603 exiting = True
607 exiting = True
604 try:
608 try:
605 events = selector.select(timeout=h.pollinterval)
609 events = selector.select(timeout=h.pollinterval)
606 except OSError as inst:
610 except OSError as inst:
607 # selectors2 raises ETIMEDOUT if timeout exceeded while
611 # selectors2 raises ETIMEDOUT if timeout exceeded while
608 # handling signal interrupt. That's probably wrong, but
612 # handling signal interrupt. That's probably wrong, but
609 # we can easily get around it.
613 # we can easily get around it.
610 if inst.errno != errno.ETIMEDOUT:
614 if inst.errno != errno.ETIMEDOUT:
611 raise
615 raise
612 events = []
616 events = []
613 if not events:
617 if not events:
614 # only exit if we completed all queued requests
618 # only exit if we completed all queued requests
615 if exiting:
619 if exiting:
616 break
620 break
617 continue
621 continue
618 for key, _mask in events:
622 for key, _mask in events:
619 key.data(key.fileobj, selector)
623 key.data(key.fileobj, selector)
620 selector.close()
624 selector.close()
621
625
622 def _acceptnewconnection(self, sock, selector):
626 def _acceptnewconnection(self, sock, selector):
623 h = self._servicehandler
627 h = self._servicehandler
624 try:
628 try:
625 conn, _addr = sock.accept()
629 conn, _addr = sock.accept()
626 except socket.error as inst:
630 except socket.error as inst:
627 if inst.args[0] == errno.EINTR:
631 if inst.args[0] == errno.EINTR:
628 return
632 return
629 raise
633 raise
630
634
631 # Future improvement: On Python 3.7, maybe gc.freeze() can be used
635 # Future improvement: On Python 3.7, maybe gc.freeze() can be used
632 # to prevent COW memory from being touched by GC.
636 # to prevent COW memory from being touched by GC.
633 # https://instagram-engineering.com/
637 # https://instagram-engineering.com/
634 # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
638 # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
635 pid = os.fork()
639 pid = os.fork()
636 if pid:
640 if pid:
637 try:
641 try:
638 self.ui.log(
642 self.ui.log(
639 b'cmdserver', b'forked worker process (pid=%d)\n', pid
643 b'cmdserver', b'forked worker process (pid=%d)\n', pid
640 )
644 )
641 self._workerpids.add(pid)
645 self._workerpids.add(pid)
642 h.newconnection()
646 h.newconnection()
643 finally:
647 finally:
644 conn.close() # release handle in parent process
648 conn.close() # release handle in parent process
645 else:
649 else:
646 try:
650 try:
647 selector.close()
651 selector.close()
648 sock.close()
652 sock.close()
649 self._mainipc.close()
653 self._mainipc.close()
650 self._runworker(conn)
654 self._runworker(conn)
651 conn.close()
655 conn.close()
652 self._workeripc.close()
656 self._workeripc.close()
653 os._exit(0)
657 os._exit(0)
654 except: # never return, hence no re-raises
658 except: # never return, hence no re-raises
655 try:
659 try:
656 self.ui.traceback(force=True)
660 self.ui.traceback(force=True)
657 finally:
661 finally:
658 os._exit(255)
662 os._exit(255)
659
663
660 def _handlemainipc(self, sock, selector):
664 def _handlemainipc(self, sock, selector):
661 """Process messages sent from a worker"""
665 """Process messages sent from a worker"""
662 try:
666 try:
663 path = sock.recv(32768) # large enough to receive path
667 path = sock.recv(32768) # large enough to receive path
664 except socket.error as inst:
668 except socket.error as inst:
665 if inst.args[0] == errno.EINTR:
669 if inst.args[0] == errno.EINTR:
666 return
670 return
667 raise
671 raise
668 self._repoloader.load(path)
672 self._repoloader.load(path)
669
673
670 def _sigchldhandler(self, signal, frame):
674 def _sigchldhandler(self, signal, frame):
671 self._reapworkers(os.WNOHANG)
675 self._reapworkers(os.WNOHANG)
672
676
673 def _reapworkers(self, options):
677 def _reapworkers(self, options):
674 while self._workerpids:
678 while self._workerpids:
675 try:
679 try:
676 pid, _status = os.waitpid(-1, options)
680 pid, _status = os.waitpid(-1, options)
677 except OSError as inst:
681 except OSError as inst:
678 if inst.errno == errno.EINTR:
682 if inst.errno == errno.EINTR:
679 continue
683 continue
680 if inst.errno != errno.ECHILD:
684 if inst.errno != errno.ECHILD:
681 raise
685 raise
682 # no child processes at all (reaped by other waitpid()?)
686 # no child processes at all (reaped by other waitpid()?)
683 self._workerpids.clear()
687 self._workerpids.clear()
684 return
688 return
685 if pid == 0:
689 if pid == 0:
686 # no waitable child processes
690 # no waitable child processes
687 return
691 return
688 self.ui.log(b'cmdserver', b'worker process exited (pid=%d)\n', pid)
692 self.ui.log(b'cmdserver', b'worker process exited (pid=%d)\n', pid)
689 self._workerpids.discard(pid)
693 self._workerpids.discard(pid)
690
694
691 def _runworker(self, conn):
695 def _runworker(self, conn):
692 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
696 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
693 _initworkerprocess()
697 _initworkerprocess()
694 h = self._servicehandler
698 h = self._servicehandler
695 try:
699 try:
696 _serverequest(
700 _serverequest(
697 self.ui,
701 self.ui,
698 self.repo,
702 self.repo,
699 conn,
703 conn,
700 h.createcmdserver,
704 h.createcmdserver,
701 prereposetups=[self._reposetup],
705 prereposetups=[self._reposetup],
702 )
706 )
703 finally:
707 finally:
704 gc.collect() # trigger __del__ since worker process uses os._exit
708 gc.collect() # trigger __del__ since worker process uses os._exit
705
709
706 def _reposetup(self, ui, repo):
710 def _reposetup(self, ui, repo):
707 if not repo.local():
711 if not repo.local():
708 return
712 return
709
713
710 class unixcmdserverrepo(repo.__class__):
714 class unixcmdserverrepo(repo.__class__):
711 def close(self):
715 def close(self):
712 super(unixcmdserverrepo, self).close()
716 super(unixcmdserverrepo, self).close()
713 try:
717 try:
714 self._cmdserveripc.send(self.root)
718 self._cmdserveripc.send(self.root)
715 except socket.error:
719 except socket.error:
716 self.ui.log(
720 self.ui.log(
717 b'cmdserver', b'failed to send repo root to master\n'
721 b'cmdserver', b'failed to send repo root to master\n'
718 )
722 )
719
723
720 repo.__class__ = unixcmdserverrepo
724 repo.__class__ = unixcmdserverrepo
721 repo._cmdserveripc = self._workeripc
725 repo._cmdserveripc = self._workeripc
722
726
723 cachedrepo = self._repoloader.get(repo.root)
727 cachedrepo = self._repoloader.get(repo.root)
724 if cachedrepo is None:
728 if cachedrepo is None:
725 return
729 return
726 repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root)
730 repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root)
727 repocache.copycache(cachedrepo, repo)
731 repocache.copycache(cachedrepo, repo)
General Comments 0
You need to be logged in to leave comments. Login now