##// END OF EJS Templates
safehasattr: pass attribute name as string instead of bytes...
marmoute -
r51451:ef5435e7 default
parent child Browse files
Show More
@@ -1,739 +1,739 b''
1 # commandserver.py - communicate with Mercurial's API over a pipe
1 # commandserver.py - communicate with Mercurial's API over a pipe
2 #
2 #
3 # Copyright Olivia Mackall <olivia@selenic.com>
3 # Copyright Olivia Mackall <olivia@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import gc
9 import gc
10 import os
10 import os
11 import random
11 import random
12 import selectors
12 import selectors
13 import signal
13 import signal
14 import socket
14 import socket
15 import struct
15 import struct
16 import traceback
16 import traceback
17
17
18 from .i18n import _
18 from .i18n import _
19 from .pycompat import getattr
19 from .pycompat import getattr
20 from . import (
20 from . import (
21 encoding,
21 encoding,
22 error,
22 error,
23 loggingutil,
23 loggingutil,
24 pycompat,
24 pycompat,
25 repocache,
25 repocache,
26 util,
26 util,
27 vfs as vfsmod,
27 vfs as vfsmod,
28 )
28 )
29 from .utils import (
29 from .utils import (
30 cborutil,
30 cborutil,
31 procutil,
31 procutil,
32 )
32 )
33
33
34
34
35 class channeledoutput:
35 class channeledoutput:
36 """
36 """
37 Write data to out in the following format:
37 Write data to out in the following format:
38
38
39 data length (unsigned int),
39 data length (unsigned int),
40 data
40 data
41 """
41 """
42
42
43 def __init__(self, out, channel):
43 def __init__(self, out, channel):
44 self.out = out
44 self.out = out
45 self.channel = channel
45 self.channel = channel
46
46
47 @property
47 @property
48 def name(self):
48 def name(self):
49 return b'<%c-channel>' % self.channel
49 return b'<%c-channel>' % self.channel
50
50
51 def write(self, data):
51 def write(self, data):
52 if not data:
52 if not data:
53 return
53 return
54 # single write() to guarantee the same atomicity as the underlying file
54 # single write() to guarantee the same atomicity as the underlying file
55 self.out.write(struct.pack(b'>cI', self.channel, len(data)) + data)
55 self.out.write(struct.pack(b'>cI', self.channel, len(data)) + data)
56 self.out.flush()
56 self.out.flush()
57
57
58 def __getattr__(self, attr):
58 def __getattr__(self, attr):
59 if attr in ('isatty', 'fileno', 'tell', 'seek'):
59 if attr in ('isatty', 'fileno', 'tell', 'seek'):
60 raise AttributeError(attr)
60 raise AttributeError(attr)
61 return getattr(self.out, attr)
61 return getattr(self.out, attr)
62
62
63
63
64 class channeledmessage:
64 class channeledmessage:
65 """
65 """
66 Write encoded message and metadata to out in the following format:
66 Write encoded message and metadata to out in the following format:
67
67
68 data length (unsigned int),
68 data length (unsigned int),
69 encoded message and metadata, as a flat key-value dict.
69 encoded message and metadata, as a flat key-value dict.
70
70
71 Each message should have 'type' attribute. Messages of unknown type
71 Each message should have 'type' attribute. Messages of unknown type
72 should be ignored.
72 should be ignored.
73 """
73 """
74
74
75 # teach ui that write() can take **opts
75 # teach ui that write() can take **opts
76 structured = True
76 structured = True
77
77
78 def __init__(self, out, channel, encodename, encodefn):
78 def __init__(self, out, channel, encodename, encodefn):
79 self._cout = channeledoutput(out, channel)
79 self._cout = channeledoutput(out, channel)
80 self.encoding = encodename
80 self.encoding = encodename
81 self._encodefn = encodefn
81 self._encodefn = encodefn
82
82
83 def write(self, data, **opts):
83 def write(self, data, **opts):
84 opts = pycompat.byteskwargs(opts)
84 opts = pycompat.byteskwargs(opts)
85 if data is not None:
85 if data is not None:
86 opts[b'data'] = data
86 opts[b'data'] = data
87 self._cout.write(self._encodefn(opts))
87 self._cout.write(self._encodefn(opts))
88
88
89 def __getattr__(self, attr):
89 def __getattr__(self, attr):
90 return getattr(self._cout, attr)
90 return getattr(self._cout, attr)
91
91
92
92
93 class channeledinput:
93 class channeledinput:
94 """
94 """
95 Read data from in_.
95 Read data from in_.
96
96
97 Requests for input are written to out in the following format:
97 Requests for input are written to out in the following format:
98 channel identifier - 'I' for plain input, 'L' line based (1 byte)
98 channel identifier - 'I' for plain input, 'L' line based (1 byte)
99 how many bytes to send at most (unsigned int),
99 how many bytes to send at most (unsigned int),
100
100
101 The client replies with:
101 The client replies with:
102 data length (unsigned int), 0 meaning EOF
102 data length (unsigned int), 0 meaning EOF
103 data
103 data
104 """
104 """
105
105
106 maxchunksize = 4 * 1024
106 maxchunksize = 4 * 1024
107
107
108 def __init__(self, in_, out, channel):
108 def __init__(self, in_, out, channel):
109 self.in_ = in_
109 self.in_ = in_
110 self.out = out
110 self.out = out
111 self.channel = channel
111 self.channel = channel
112
112
113 @property
113 @property
114 def name(self):
114 def name(self):
115 return b'<%c-channel>' % self.channel
115 return b'<%c-channel>' % self.channel
116
116
117 def read(self, size=-1):
117 def read(self, size=-1):
118 if size < 0:
118 if size < 0:
119 # if we need to consume all the clients input, ask for 4k chunks
119 # if we need to consume all the clients input, ask for 4k chunks
120 # so the pipe doesn't fill up risking a deadlock
120 # so the pipe doesn't fill up risking a deadlock
121 size = self.maxchunksize
121 size = self.maxchunksize
122 s = self._read(size, self.channel)
122 s = self._read(size, self.channel)
123 buf = s
123 buf = s
124 while s:
124 while s:
125 s = self._read(size, self.channel)
125 s = self._read(size, self.channel)
126 buf += s
126 buf += s
127
127
128 return buf
128 return buf
129 else:
129 else:
130 return self._read(size, self.channel)
130 return self._read(size, self.channel)
131
131
132 def _read(self, size, channel):
132 def _read(self, size, channel):
133 if not size:
133 if not size:
134 return b''
134 return b''
135 assert size > 0
135 assert size > 0
136
136
137 # tell the client we need at most size bytes
137 # tell the client we need at most size bytes
138 self.out.write(struct.pack(b'>cI', channel, size))
138 self.out.write(struct.pack(b'>cI', channel, size))
139 self.out.flush()
139 self.out.flush()
140
140
141 length = self.in_.read(4)
141 length = self.in_.read(4)
142 length = struct.unpack(b'>I', length)[0]
142 length = struct.unpack(b'>I', length)[0]
143 if not length:
143 if not length:
144 return b''
144 return b''
145 else:
145 else:
146 return self.in_.read(length)
146 return self.in_.read(length)
147
147
148 def readline(self, size=-1):
148 def readline(self, size=-1):
149 if size < 0:
149 if size < 0:
150 size = self.maxchunksize
150 size = self.maxchunksize
151 s = self._read(size, b'L')
151 s = self._read(size, b'L')
152 buf = s
152 buf = s
153 # keep asking for more until there's either no more or
153 # keep asking for more until there's either no more or
154 # we got a full line
154 # we got a full line
155 while s and not s.endswith(b'\n'):
155 while s and not s.endswith(b'\n'):
156 s = self._read(size, b'L')
156 s = self._read(size, b'L')
157 buf += s
157 buf += s
158
158
159 return buf
159 return buf
160 else:
160 else:
161 return self._read(size, b'L')
161 return self._read(size, b'L')
162
162
163 def __iter__(self):
163 def __iter__(self):
164 return self
164 return self
165
165
166 def next(self):
166 def next(self):
167 l = self.readline()
167 l = self.readline()
168 if not l:
168 if not l:
169 raise StopIteration
169 raise StopIteration
170 return l
170 return l
171
171
172 __next__ = next
172 __next__ = next
173
173
174 def __getattr__(self, attr):
174 def __getattr__(self, attr):
175 if attr in ('isatty', 'fileno', 'tell', 'seek'):
175 if attr in ('isatty', 'fileno', 'tell', 'seek'):
176 raise AttributeError(attr)
176 raise AttributeError(attr)
177 return getattr(self.in_, attr)
177 return getattr(self.in_, attr)
178
178
179
179
180 _messageencoders = {
180 _messageencoders = {
181 b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
181 b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
182 }
182 }
183
183
184
184
185 def _selectmessageencoder(ui):
185 def _selectmessageencoder(ui):
186 encnames = ui.configlist(b'cmdserver', b'message-encodings')
186 encnames = ui.configlist(b'cmdserver', b'message-encodings')
187 for n in encnames:
187 for n in encnames:
188 f = _messageencoders.get(n)
188 f = _messageencoders.get(n)
189 if f:
189 if f:
190 return n, f
190 return n, f
191 raise error.Abort(
191 raise error.Abort(
192 b'no supported message encodings: %s' % b' '.join(encnames)
192 b'no supported message encodings: %s' % b' '.join(encnames)
193 )
193 )
194
194
195
195
196 class server:
196 class server:
197 """
197 """
198 Listens for commands on fin, runs them and writes the output on a channel
198 Listens for commands on fin, runs them and writes the output on a channel
199 based stream to fout.
199 based stream to fout.
200 """
200 """
201
201
202 def __init__(self, ui, repo, fin, fout, prereposetups=None):
202 def __init__(self, ui, repo, fin, fout, prereposetups=None):
203 self.cwd = encoding.getcwd()
203 self.cwd = encoding.getcwd()
204
204
205 if repo:
205 if repo:
206 # the ui here is really the repo ui so take its baseui so we don't
206 # the ui here is really the repo ui so take its baseui so we don't
207 # end up with its local configuration
207 # end up with its local configuration
208 self.ui = repo.baseui
208 self.ui = repo.baseui
209 self.repo = repo
209 self.repo = repo
210 self.repoui = repo.ui
210 self.repoui = repo.ui
211 else:
211 else:
212 self.ui = ui
212 self.ui = ui
213 self.repo = self.repoui = None
213 self.repo = self.repoui = None
214 self._prereposetups = prereposetups
214 self._prereposetups = prereposetups
215
215
216 self.cdebug = channeledoutput(fout, b'd')
216 self.cdebug = channeledoutput(fout, b'd')
217 self.cerr = channeledoutput(fout, b'e')
217 self.cerr = channeledoutput(fout, b'e')
218 self.cout = channeledoutput(fout, b'o')
218 self.cout = channeledoutput(fout, b'o')
219 self.cin = channeledinput(fin, fout, b'I')
219 self.cin = channeledinput(fin, fout, b'I')
220 self.cresult = channeledoutput(fout, b'r')
220 self.cresult = channeledoutput(fout, b'r')
221
221
222 if self.ui.config(b'cmdserver', b'log') == b'-':
222 if self.ui.config(b'cmdserver', b'log') == b'-':
223 # switch log stream of server's ui to the 'd' (debug) channel
223 # switch log stream of server's ui to the 'd' (debug) channel
224 # (don't touch repo.ui as its lifetime is longer than the server)
224 # (don't touch repo.ui as its lifetime is longer than the server)
225 self.ui = self.ui.copy()
225 self.ui = self.ui.copy()
226 setuplogging(self.ui, repo=None, fp=self.cdebug)
226 setuplogging(self.ui, repo=None, fp=self.cdebug)
227
227
228 self.cmsg = None
228 self.cmsg = None
229 if ui.config(b'ui', b'message-output') == b'channel':
229 if ui.config(b'ui', b'message-output') == b'channel':
230 encname, encfn = _selectmessageencoder(ui)
230 encname, encfn = _selectmessageencoder(ui)
231 self.cmsg = channeledmessage(fout, b'm', encname, encfn)
231 self.cmsg = channeledmessage(fout, b'm', encname, encfn)
232
232
233 self.client = fin
233 self.client = fin
234
234
235 # If shutdown-on-interrupt is off, the default SIGINT handler is
235 # If shutdown-on-interrupt is off, the default SIGINT handler is
236 # removed so that client-server communication wouldn't be interrupted.
236 # removed so that client-server communication wouldn't be interrupted.
237 # For example, 'runcommand' handler will issue three short read()s.
237 # For example, 'runcommand' handler will issue three short read()s.
238 # If one of the first two read()s were interrupted, the communication
238 # If one of the first two read()s were interrupted, the communication
239 # channel would be left at dirty state and the subsequent request
239 # channel would be left at dirty state and the subsequent request
240 # wouldn't be parsed. So catching KeyboardInterrupt isn't enough.
240 # wouldn't be parsed. So catching KeyboardInterrupt isn't enough.
241 self._shutdown_on_interrupt = ui.configbool(
241 self._shutdown_on_interrupt = ui.configbool(
242 b'cmdserver', b'shutdown-on-interrupt'
242 b'cmdserver', b'shutdown-on-interrupt'
243 )
243 )
244 self._old_inthandler = None
244 self._old_inthandler = None
245 if not self._shutdown_on_interrupt:
245 if not self._shutdown_on_interrupt:
246 self._old_inthandler = signal.signal(signal.SIGINT, signal.SIG_IGN)
246 self._old_inthandler = signal.signal(signal.SIGINT, signal.SIG_IGN)
247
247
248 def cleanup(self):
248 def cleanup(self):
249 """release and restore resources taken during server session"""
249 """release and restore resources taken during server session"""
250 if not self._shutdown_on_interrupt:
250 if not self._shutdown_on_interrupt:
251 signal.signal(signal.SIGINT, self._old_inthandler)
251 signal.signal(signal.SIGINT, self._old_inthandler)
252
252
253 def _read(self, size):
253 def _read(self, size):
254 if not size:
254 if not size:
255 return b''
255 return b''
256
256
257 data = self.client.read(size)
257 data = self.client.read(size)
258
258
259 # is the other end closed?
259 # is the other end closed?
260 if not data:
260 if not data:
261 raise EOFError
261 raise EOFError
262
262
263 return data
263 return data
264
264
265 def _readstr(self):
265 def _readstr(self):
266 """read a string from the channel
266 """read a string from the channel
267
267
268 format:
268 format:
269 data length (uint32), data
269 data length (uint32), data
270 """
270 """
271 length = struct.unpack(b'>I', self._read(4))[0]
271 length = struct.unpack(b'>I', self._read(4))[0]
272 if not length:
272 if not length:
273 return b''
273 return b''
274 return self._read(length)
274 return self._read(length)
275
275
276 def _readlist(self):
276 def _readlist(self):
277 """read a list of NULL separated strings from the channel"""
277 """read a list of NULL separated strings from the channel"""
278 s = self._readstr()
278 s = self._readstr()
279 if s:
279 if s:
280 return s.split(b'\0')
280 return s.split(b'\0')
281 else:
281 else:
282 return []
282 return []
283
283
284 def _dispatchcommand(self, req):
284 def _dispatchcommand(self, req):
285 from . import dispatch # avoid cycle
285 from . import dispatch # avoid cycle
286
286
287 if self._shutdown_on_interrupt:
287 if self._shutdown_on_interrupt:
288 # no need to restore SIGINT handler as it is unmodified.
288 # no need to restore SIGINT handler as it is unmodified.
289 return dispatch.dispatch(req)
289 return dispatch.dispatch(req)
290
290
291 try:
291 try:
292 signal.signal(signal.SIGINT, self._old_inthandler)
292 signal.signal(signal.SIGINT, self._old_inthandler)
293 return dispatch.dispatch(req)
293 return dispatch.dispatch(req)
294 except error.SignalInterrupt:
294 except error.SignalInterrupt:
295 # propagate SIGBREAK, SIGHUP, or SIGTERM.
295 # propagate SIGBREAK, SIGHUP, or SIGTERM.
296 raise
296 raise
297 except KeyboardInterrupt:
297 except KeyboardInterrupt:
298 # SIGINT may be received out of the try-except block of dispatch(),
298 # SIGINT may be received out of the try-except block of dispatch(),
299 # so catch it as last ditch. Another KeyboardInterrupt may be
299 # so catch it as last ditch. Another KeyboardInterrupt may be
300 # raised while handling exceptions here, but there's no way to
300 # raised while handling exceptions here, but there's no way to
301 # avoid that except for doing everything in C.
301 # avoid that except for doing everything in C.
302 pass
302 pass
303 finally:
303 finally:
304 signal.signal(signal.SIGINT, signal.SIG_IGN)
304 signal.signal(signal.SIGINT, signal.SIG_IGN)
305 # On KeyboardInterrupt, print error message and exit *after* SIGINT
305 # On KeyboardInterrupt, print error message and exit *after* SIGINT
306 # handler removed.
306 # handler removed.
307 req.ui.error(_(b'interrupted!\n'))
307 req.ui.error(_(b'interrupted!\n'))
308 return -1
308 return -1
309
309
310 def runcommand(self):
310 def runcommand(self):
311 """reads a list of \0 terminated arguments, executes
311 """reads a list of \0 terminated arguments, executes
312 and writes the return code to the result channel"""
312 and writes the return code to the result channel"""
313 from . import dispatch # avoid cycle
313 from . import dispatch # avoid cycle
314
314
315 args = self._readlist()
315 args = self._readlist()
316
316
317 # copy the uis so changes (e.g. --config or --verbose) don't
317 # copy the uis so changes (e.g. --config or --verbose) don't
318 # persist between requests
318 # persist between requests
319 copiedui = self.ui.copy()
319 copiedui = self.ui.copy()
320 uis = [copiedui]
320 uis = [copiedui]
321 if self.repo:
321 if self.repo:
322 self.repo.baseui = copiedui
322 self.repo.baseui = copiedui
323 # clone ui without using ui.copy because this is protected
323 # clone ui without using ui.copy because this is protected
324 repoui = self.repoui.__class__(self.repoui)
324 repoui = self.repoui.__class__(self.repoui)
325 repoui.copy = copiedui.copy # redo copy protection
325 repoui.copy = copiedui.copy # redo copy protection
326 uis.append(repoui)
326 uis.append(repoui)
327 self.repo.ui = self.repo.dirstate._ui = repoui
327 self.repo.ui = self.repo.dirstate._ui = repoui
328 self.repo.invalidateall()
328 self.repo.invalidateall()
329
329
330 for ui in uis:
330 for ui in uis:
331 ui.resetstate()
331 ui.resetstate()
332 # any kind of interaction must use server channels, but chg may
332 # any kind of interaction must use server channels, but chg may
333 # replace channels by fully functional tty files. so nontty is
333 # replace channels by fully functional tty files. so nontty is
334 # enforced only if cin is a channel.
334 # enforced only if cin is a channel.
335 if not util.safehasattr(self.cin, 'fileno'):
335 if not util.safehasattr(self.cin, 'fileno'):
336 ui.setconfig(b'ui', b'nontty', b'true', b'commandserver')
336 ui.setconfig(b'ui', b'nontty', b'true', b'commandserver')
337
337
338 req = dispatch.request(
338 req = dispatch.request(
339 args[:],
339 args[:],
340 copiedui,
340 copiedui,
341 self.repo,
341 self.repo,
342 self.cin,
342 self.cin,
343 self.cout,
343 self.cout,
344 self.cerr,
344 self.cerr,
345 self.cmsg,
345 self.cmsg,
346 prereposetups=self._prereposetups,
346 prereposetups=self._prereposetups,
347 )
347 )
348
348
349 try:
349 try:
350 ret = self._dispatchcommand(req) & 255
350 ret = self._dispatchcommand(req) & 255
351 # If shutdown-on-interrupt is off, it's important to write the
351 # If shutdown-on-interrupt is off, it's important to write the
352 # result code *after* SIGINT handler removed. If the result code
352 # result code *after* SIGINT handler removed. If the result code
353 # were lost, the client wouldn't be able to continue processing.
353 # were lost, the client wouldn't be able to continue processing.
354 self.cresult.write(struct.pack(b'>i', int(ret)))
354 self.cresult.write(struct.pack(b'>i', int(ret)))
355 finally:
355 finally:
356 # restore old cwd
356 # restore old cwd
357 if b'--cwd' in args:
357 if b'--cwd' in args:
358 os.chdir(self.cwd)
358 os.chdir(self.cwd)
359
359
360 def getencoding(self):
360 def getencoding(self):
361 """writes the current encoding to the result channel"""
361 """writes the current encoding to the result channel"""
362 self.cresult.write(encoding.encoding)
362 self.cresult.write(encoding.encoding)
363
363
364 def serveone(self):
364 def serveone(self):
365 cmd = self.client.readline()[:-1]
365 cmd = self.client.readline()[:-1]
366 if cmd:
366 if cmd:
367 handler = self.capabilities.get(cmd)
367 handler = self.capabilities.get(cmd)
368 if handler:
368 if handler:
369 handler(self)
369 handler(self)
370 else:
370 else:
371 # clients are expected to check what commands are supported by
371 # clients are expected to check what commands are supported by
372 # looking at the servers capabilities
372 # looking at the servers capabilities
373 raise error.Abort(_(b'unknown command %s') % cmd)
373 raise error.Abort(_(b'unknown command %s') % cmd)
374
374
375 return cmd != b''
375 return cmd != b''
376
376
377 capabilities = {b'runcommand': runcommand, b'getencoding': getencoding}
377 capabilities = {b'runcommand': runcommand, b'getencoding': getencoding}
378
378
379 def serve(self):
379 def serve(self):
380 hellomsg = b'capabilities: ' + b' '.join(sorted(self.capabilities))
380 hellomsg = b'capabilities: ' + b' '.join(sorted(self.capabilities))
381 hellomsg += b'\n'
381 hellomsg += b'\n'
382 hellomsg += b'encoding: ' + encoding.encoding
382 hellomsg += b'encoding: ' + encoding.encoding
383 hellomsg += b'\n'
383 hellomsg += b'\n'
384 if self.cmsg:
384 if self.cmsg:
385 hellomsg += b'message-encoding: %s\n' % self.cmsg.encoding
385 hellomsg += b'message-encoding: %s\n' % self.cmsg.encoding
386 hellomsg += b'pid: %d' % procutil.getpid()
386 hellomsg += b'pid: %d' % procutil.getpid()
387 if util.safehasattr(os, b'getpgid'):
387 if util.safehasattr(os, 'getpgid'):
388 hellomsg += b'\n'
388 hellomsg += b'\n'
389 hellomsg += b'pgid: %d' % os.getpgid(0)
389 hellomsg += b'pgid: %d' % os.getpgid(0)
390
390
391 # write the hello msg in -one- chunk
391 # write the hello msg in -one- chunk
392 self.cout.write(hellomsg)
392 self.cout.write(hellomsg)
393
393
394 try:
394 try:
395 while self.serveone():
395 while self.serveone():
396 pass
396 pass
397 except EOFError:
397 except EOFError:
398 # we'll get here if the client disconnected while we were reading
398 # we'll get here if the client disconnected while we were reading
399 # its request
399 # its request
400 return 1
400 return 1
401
401
402 return 0
402 return 0
403
403
404
404
405 def setuplogging(ui, repo=None, fp=None):
405 def setuplogging(ui, repo=None, fp=None):
406 """Set up server logging facility
406 """Set up server logging facility
407
407
408 If cmdserver.log is '-', log messages will be sent to the given fp.
408 If cmdserver.log is '-', log messages will be sent to the given fp.
409 It should be the 'd' channel while a client is connected, and otherwise
409 It should be the 'd' channel while a client is connected, and otherwise
410 is the stderr of the server process.
410 is the stderr of the server process.
411 """
411 """
412 # developer config: cmdserver.log
412 # developer config: cmdserver.log
413 logpath = ui.config(b'cmdserver', b'log')
413 logpath = ui.config(b'cmdserver', b'log')
414 if not logpath:
414 if not logpath:
415 return
415 return
416 # developer config: cmdserver.track-log
416 # developer config: cmdserver.track-log
417 tracked = set(ui.configlist(b'cmdserver', b'track-log'))
417 tracked = set(ui.configlist(b'cmdserver', b'track-log'))
418
418
419 if logpath == b'-' and fp:
419 if logpath == b'-' and fp:
420 logger = loggingutil.fileobjectlogger(fp, tracked)
420 logger = loggingutil.fileobjectlogger(fp, tracked)
421 elif logpath == b'-':
421 elif logpath == b'-':
422 logger = loggingutil.fileobjectlogger(ui.ferr, tracked)
422 logger = loggingutil.fileobjectlogger(ui.ferr, tracked)
423 else:
423 else:
424 logpath = util.abspath(util.expandpath(logpath))
424 logpath = util.abspath(util.expandpath(logpath))
425 # developer config: cmdserver.max-log-files
425 # developer config: cmdserver.max-log-files
426 maxfiles = ui.configint(b'cmdserver', b'max-log-files')
426 maxfiles = ui.configint(b'cmdserver', b'max-log-files')
427 # developer config: cmdserver.max-log-size
427 # developer config: cmdserver.max-log-size
428 maxsize = ui.configbytes(b'cmdserver', b'max-log-size')
428 maxsize = ui.configbytes(b'cmdserver', b'max-log-size')
429 vfs = vfsmod.vfs(os.path.dirname(logpath))
429 vfs = vfsmod.vfs(os.path.dirname(logpath))
430 logger = loggingutil.filelogger(
430 logger = loggingutil.filelogger(
431 vfs,
431 vfs,
432 os.path.basename(logpath),
432 os.path.basename(logpath),
433 tracked,
433 tracked,
434 maxfiles=maxfiles,
434 maxfiles=maxfiles,
435 maxsize=maxsize,
435 maxsize=maxsize,
436 )
436 )
437
437
438 targetuis = {ui}
438 targetuis = {ui}
439 if repo:
439 if repo:
440 targetuis.add(repo.baseui)
440 targetuis.add(repo.baseui)
441 targetuis.add(repo.ui)
441 targetuis.add(repo.ui)
442 for u in targetuis:
442 for u in targetuis:
443 u.setlogger(b'cmdserver', logger)
443 u.setlogger(b'cmdserver', logger)
444
444
445
445
446 class pipeservice:
446 class pipeservice:
447 def __init__(self, ui, repo, opts):
447 def __init__(self, ui, repo, opts):
448 self.ui = ui
448 self.ui = ui
449 self.repo = repo
449 self.repo = repo
450
450
451 def init(self):
451 def init(self):
452 pass
452 pass
453
453
454 def run(self):
454 def run(self):
455 ui = self.ui
455 ui = self.ui
456 # redirect stdio to null device so that broken extensions or in-process
456 # redirect stdio to null device so that broken extensions or in-process
457 # hooks will never cause corruption of channel protocol.
457 # hooks will never cause corruption of channel protocol.
458 with ui.protectedfinout() as (fin, fout):
458 with ui.protectedfinout() as (fin, fout):
459 sv = server(ui, self.repo, fin, fout)
459 sv = server(ui, self.repo, fin, fout)
460 try:
460 try:
461 return sv.serve()
461 return sv.serve()
462 finally:
462 finally:
463 sv.cleanup()
463 sv.cleanup()
464
464
465
465
466 def _initworkerprocess():
466 def _initworkerprocess():
467 # use a different process group from the master process, in order to:
467 # use a different process group from the master process, in order to:
468 # 1. make the current process group no longer "orphaned" (because the
468 # 1. make the current process group no longer "orphaned" (because the
469 # parent of this process is in a different process group while
469 # parent of this process is in a different process group while
470 # remains in a same session)
470 # remains in a same session)
471 # according to POSIX 2.2.2.52, orphaned process group will ignore
471 # according to POSIX 2.2.2.52, orphaned process group will ignore
472 # terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
472 # terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
473 # cause trouble for things like ncurses.
473 # cause trouble for things like ncurses.
474 # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
474 # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
475 # SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
475 # SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
476 # processes like ssh will be killed properly, without affecting
476 # processes like ssh will be killed properly, without affecting
477 # unrelated processes.
477 # unrelated processes.
478 os.setpgid(0, 0)
478 os.setpgid(0, 0)
479 # change random state otherwise forked request handlers would have a
479 # change random state otherwise forked request handlers would have a
480 # same state inherited from parent.
480 # same state inherited from parent.
481 random.seed()
481 random.seed()
482
482
483
483
484 def _serverequest(ui, repo, conn, createcmdserver, prereposetups):
484 def _serverequest(ui, repo, conn, createcmdserver, prereposetups):
485 fin = conn.makefile('rb')
485 fin = conn.makefile('rb')
486 fout = conn.makefile('wb')
486 fout = conn.makefile('wb')
487 sv = None
487 sv = None
488 try:
488 try:
489 sv = createcmdserver(repo, conn, fin, fout, prereposetups)
489 sv = createcmdserver(repo, conn, fin, fout, prereposetups)
490 try:
490 try:
491 sv.serve()
491 sv.serve()
492 # handle exceptions that may be raised by command server. most of
492 # handle exceptions that may be raised by command server. most of
493 # known exceptions are caught by dispatch.
493 # known exceptions are caught by dispatch.
494 except error.Abort as inst:
494 except error.Abort as inst:
495 ui.error(_(b'abort: %s\n') % inst.message)
495 ui.error(_(b'abort: %s\n') % inst.message)
496 except BrokenPipeError:
496 except BrokenPipeError:
497 pass
497 pass
498 except KeyboardInterrupt:
498 except KeyboardInterrupt:
499 pass
499 pass
500 finally:
500 finally:
501 sv.cleanup()
501 sv.cleanup()
502 except: # re-raises
502 except: # re-raises
503 # also write traceback to error channel. otherwise client cannot
503 # also write traceback to error channel. otherwise client cannot
504 # see it because it is written to server's stderr by default.
504 # see it because it is written to server's stderr by default.
505 if sv:
505 if sv:
506 cerr = sv.cerr
506 cerr = sv.cerr
507 else:
507 else:
508 cerr = channeledoutput(fout, b'e')
508 cerr = channeledoutput(fout, b'e')
509 cerr.write(encoding.strtolocal(traceback.format_exc()))
509 cerr.write(encoding.strtolocal(traceback.format_exc()))
510 raise
510 raise
511 finally:
511 finally:
512 fin.close()
512 fin.close()
513 try:
513 try:
514 fout.close() # implicit flush() may cause another EPIPE
514 fout.close() # implicit flush() may cause another EPIPE
515 except BrokenPipeError:
515 except BrokenPipeError:
516 pass
516 pass
517
517
518
518
519 class unixservicehandler:
519 class unixservicehandler:
520 """Set of pluggable operations for unix-mode services
520 """Set of pluggable operations for unix-mode services
521
521
522 Almost all methods except for createcmdserver() are called in the main
522 Almost all methods except for createcmdserver() are called in the main
523 process. You can't pass mutable resource back from createcmdserver().
523 process. You can't pass mutable resource back from createcmdserver().
524 """
524 """
525
525
526 pollinterval = None
526 pollinterval = None
527
527
528 def __init__(self, ui):
528 def __init__(self, ui):
529 self.ui = ui
529 self.ui = ui
530
530
531 def bindsocket(self, sock, address):
531 def bindsocket(self, sock, address):
532 util.bindunixsocket(sock, address)
532 util.bindunixsocket(sock, address)
533 sock.listen(socket.SOMAXCONN)
533 sock.listen(socket.SOMAXCONN)
534 self.ui.status(_(b'listening at %s\n') % address)
534 self.ui.status(_(b'listening at %s\n') % address)
535 self.ui.flush() # avoid buffering of status message
535 self.ui.flush() # avoid buffering of status message
536
536
537 def unlinksocket(self, address):
537 def unlinksocket(self, address):
538 os.unlink(address)
538 os.unlink(address)
539
539
540 def shouldexit(self):
540 def shouldexit(self):
541 """True if server should shut down; checked per pollinterval"""
541 """True if server should shut down; checked per pollinterval"""
542 return False
542 return False
543
543
544 def newconnection(self):
544 def newconnection(self):
545 """Called when main process notices new connection"""
545 """Called when main process notices new connection"""
546
546
547 def createcmdserver(self, repo, conn, fin, fout, prereposetups):
547 def createcmdserver(self, repo, conn, fin, fout, prereposetups):
548 """Create new command server instance; called in the process that
548 """Create new command server instance; called in the process that
549 serves for the current connection"""
549 serves for the current connection"""
550 return server(self.ui, repo, fin, fout, prereposetups)
550 return server(self.ui, repo, fin, fout, prereposetups)
551
551
552
552
553 class unixforkingservice:
553 class unixforkingservice:
554 """
554 """
555 Listens on unix domain socket and forks server per connection
555 Listens on unix domain socket and forks server per connection
556 """
556 """
557
557
558 def __init__(self, ui, repo, opts, handler=None):
558 def __init__(self, ui, repo, opts, handler=None):
559 self.ui = ui
559 self.ui = ui
560 self.repo = repo
560 self.repo = repo
561 self.address = opts[b'address']
561 self.address = opts[b'address']
562 if not util.safehasattr(socket, b'AF_UNIX'):
562 if not util.safehasattr(socket, b'AF_UNIX'):
563 raise error.Abort(_(b'unsupported platform'))
563 raise error.Abort(_(b'unsupported platform'))
564 if not self.address:
564 if not self.address:
565 raise error.Abort(_(b'no socket path specified with --address'))
565 raise error.Abort(_(b'no socket path specified with --address'))
566 self._servicehandler = handler or unixservicehandler(ui)
566 self._servicehandler = handler or unixservicehandler(ui)
567 self._sock = None
567 self._sock = None
568 self._mainipc = None
568 self._mainipc = None
569 self._workeripc = None
569 self._workeripc = None
570 self._oldsigchldhandler = None
570 self._oldsigchldhandler = None
571 self._workerpids = set() # updated by signal handler; do not iterate
571 self._workerpids = set() # updated by signal handler; do not iterate
572 self._socketunlinked = None
572 self._socketunlinked = None
573 # experimental config: cmdserver.max-repo-cache
573 # experimental config: cmdserver.max-repo-cache
574 maxlen = ui.configint(b'cmdserver', b'max-repo-cache')
574 maxlen = ui.configint(b'cmdserver', b'max-repo-cache')
575 if maxlen < 0:
575 if maxlen < 0:
576 raise error.Abort(_(b'negative max-repo-cache size not allowed'))
576 raise error.Abort(_(b'negative max-repo-cache size not allowed'))
577 self._repoloader = repocache.repoloader(ui, maxlen)
577 self._repoloader = repocache.repoloader(ui, maxlen)
578 # attempt to avoid crash in CoreFoundation when using chg after fix in
578 # attempt to avoid crash in CoreFoundation when using chg after fix in
579 # a89381e04c58
579 # a89381e04c58
580 if pycompat.isdarwin:
580 if pycompat.isdarwin:
581 procutil.gui()
581 procutil.gui()
582
582
583 def init(self):
583 def init(self):
584 self._sock = socket.socket(socket.AF_UNIX)
584 self._sock = socket.socket(socket.AF_UNIX)
585 # IPC channel from many workers to one main process; this is actually
585 # IPC channel from many workers to one main process; this is actually
586 # a uni-directional pipe, but is backed by a DGRAM socket so each
586 # a uni-directional pipe, but is backed by a DGRAM socket so each
587 # message can be easily separated.
587 # message can be easily separated.
588 o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
588 o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
589 self._mainipc, self._workeripc = o
589 self._mainipc, self._workeripc = o
590 self._servicehandler.bindsocket(self._sock, self.address)
590 self._servicehandler.bindsocket(self._sock, self.address)
591 if util.safehasattr(procutil, b'unblocksignal'):
591 if util.safehasattr(procutil, b'unblocksignal'):
592 procutil.unblocksignal(signal.SIGCHLD)
592 procutil.unblocksignal(signal.SIGCHLD)
593 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
593 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
594 self._oldsigchldhandler = o
594 self._oldsigchldhandler = o
595 self._socketunlinked = False
595 self._socketunlinked = False
596 self._repoloader.start()
596 self._repoloader.start()
597
597
598 def _unlinksocket(self):
598 def _unlinksocket(self):
599 if not self._socketunlinked:
599 if not self._socketunlinked:
600 self._servicehandler.unlinksocket(self.address)
600 self._servicehandler.unlinksocket(self.address)
601 self._socketunlinked = True
601 self._socketunlinked = True
602
602
603 def _cleanup(self):
603 def _cleanup(self):
604 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
604 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
605 self._sock.close()
605 self._sock.close()
606 self._mainipc.close()
606 self._mainipc.close()
607 self._workeripc.close()
607 self._workeripc.close()
608 self._unlinksocket()
608 self._unlinksocket()
609 self._repoloader.stop()
609 self._repoloader.stop()
610 # don't kill child processes as they have active clients, just wait
610 # don't kill child processes as they have active clients, just wait
611 self._reapworkers(0)
611 self._reapworkers(0)
612
612
613 def run(self):
613 def run(self):
614 try:
614 try:
615 self._mainloop()
615 self._mainloop()
616 finally:
616 finally:
617 self._cleanup()
617 self._cleanup()
618
618
619 def _mainloop(self):
619 def _mainloop(self):
620 exiting = False
620 exiting = False
621 h = self._servicehandler
621 h = self._servicehandler
622 selector = selectors.DefaultSelector()
622 selector = selectors.DefaultSelector()
623 selector.register(
623 selector.register(
624 self._sock, selectors.EVENT_READ, self._acceptnewconnection
624 self._sock, selectors.EVENT_READ, self._acceptnewconnection
625 )
625 )
626 selector.register(
626 selector.register(
627 self._mainipc, selectors.EVENT_READ, self._handlemainipc
627 self._mainipc, selectors.EVENT_READ, self._handlemainipc
628 )
628 )
629 while True:
629 while True:
630 if not exiting and h.shouldexit():
630 if not exiting and h.shouldexit():
631 # clients can no longer connect() to the domain socket, so
631 # clients can no longer connect() to the domain socket, so
632 # we stop queuing new requests.
632 # we stop queuing new requests.
633 # for requests that are queued (connect()-ed, but haven't been
633 # for requests that are queued (connect()-ed, but haven't been
634 # accept()-ed), handle them before exit. otherwise, clients
634 # accept()-ed), handle them before exit. otherwise, clients
635 # waiting for recv() will receive ECONNRESET.
635 # waiting for recv() will receive ECONNRESET.
636 self._unlinksocket()
636 self._unlinksocket()
637 exiting = True
637 exiting = True
638 events = selector.select(timeout=h.pollinterval)
638 events = selector.select(timeout=h.pollinterval)
639 if not events:
639 if not events:
640 # only exit if we completed all queued requests
640 # only exit if we completed all queued requests
641 if exiting:
641 if exiting:
642 break
642 break
643 continue
643 continue
644 for key, _mask in events:
644 for key, _mask in events:
645 key.data(key.fileobj, selector)
645 key.data(key.fileobj, selector)
646 selector.close()
646 selector.close()
647
647
648 def _acceptnewconnection(self, sock, selector):
648 def _acceptnewconnection(self, sock, selector):
649 h = self._servicehandler
649 h = self._servicehandler
650 conn, _addr = sock.accept()
650 conn, _addr = sock.accept()
651
651
652 # Future improvement: On Python 3.7, maybe gc.freeze() can be used
652 # Future improvement: On Python 3.7, maybe gc.freeze() can be used
653 # to prevent COW memory from being touched by GC.
653 # to prevent COW memory from being touched by GC.
654 # https://instagram-engineering.com/
654 # https://instagram-engineering.com/
655 # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
655 # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
656 pid = os.fork()
656 pid = os.fork()
657 if pid:
657 if pid:
658 try:
658 try:
659 self.ui.log(
659 self.ui.log(
660 b'cmdserver', b'forked worker process (pid=%d)\n', pid
660 b'cmdserver', b'forked worker process (pid=%d)\n', pid
661 )
661 )
662 self._workerpids.add(pid)
662 self._workerpids.add(pid)
663 h.newconnection()
663 h.newconnection()
664 finally:
664 finally:
665 conn.close() # release handle in parent process
665 conn.close() # release handle in parent process
666 else:
666 else:
667 try:
667 try:
668 selector.close()
668 selector.close()
669 sock.close()
669 sock.close()
670 self._mainipc.close()
670 self._mainipc.close()
671 self._runworker(conn)
671 self._runworker(conn)
672 conn.close()
672 conn.close()
673 self._workeripc.close()
673 self._workeripc.close()
674 os._exit(0)
674 os._exit(0)
675 except: # never return, hence no re-raises
675 except: # never return, hence no re-raises
676 try:
676 try:
677 self.ui.traceback(force=True)
677 self.ui.traceback(force=True)
678 finally:
678 finally:
679 os._exit(255)
679 os._exit(255)
680
680
681 def _handlemainipc(self, sock, selector):
681 def _handlemainipc(self, sock, selector):
682 """Process messages sent from a worker"""
682 """Process messages sent from a worker"""
683 path = sock.recv(32768) # large enough to receive path
683 path = sock.recv(32768) # large enough to receive path
684 self._repoloader.load(path)
684 self._repoloader.load(path)
685
685
686 def _sigchldhandler(self, signal, frame):
686 def _sigchldhandler(self, signal, frame):
687 self._reapworkers(os.WNOHANG)
687 self._reapworkers(os.WNOHANG)
688
688
689 def _reapworkers(self, options):
689 def _reapworkers(self, options):
690 while self._workerpids:
690 while self._workerpids:
691 try:
691 try:
692 pid, _status = os.waitpid(-1, options)
692 pid, _status = os.waitpid(-1, options)
693 except ChildProcessError:
693 except ChildProcessError:
694 # no child processes at all (reaped by other waitpid()?)
694 # no child processes at all (reaped by other waitpid()?)
695 self._workerpids.clear()
695 self._workerpids.clear()
696 return
696 return
697 if pid == 0:
697 if pid == 0:
698 # no waitable child processes
698 # no waitable child processes
699 return
699 return
700 self.ui.log(b'cmdserver', b'worker process exited (pid=%d)\n', pid)
700 self.ui.log(b'cmdserver', b'worker process exited (pid=%d)\n', pid)
701 self._workerpids.discard(pid)
701 self._workerpids.discard(pid)
702
702
703 def _runworker(self, conn):
703 def _runworker(self, conn):
704 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
704 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
705 _initworkerprocess()
705 _initworkerprocess()
706 h = self._servicehandler
706 h = self._servicehandler
707 try:
707 try:
708 _serverequest(
708 _serverequest(
709 self.ui,
709 self.ui,
710 self.repo,
710 self.repo,
711 conn,
711 conn,
712 h.createcmdserver,
712 h.createcmdserver,
713 prereposetups=[self._reposetup],
713 prereposetups=[self._reposetup],
714 )
714 )
715 finally:
715 finally:
716 gc.collect() # trigger __del__ since worker process uses os._exit
716 gc.collect() # trigger __del__ since worker process uses os._exit
717
717
718 def _reposetup(self, ui, repo):
718 def _reposetup(self, ui, repo):
719 if not repo.local():
719 if not repo.local():
720 return
720 return
721
721
722 class unixcmdserverrepo(repo.__class__):
722 class unixcmdserverrepo(repo.__class__):
723 def close(self):
723 def close(self):
724 super(unixcmdserverrepo, self).close()
724 super(unixcmdserverrepo, self).close()
725 try:
725 try:
726 self._cmdserveripc.send(self.root)
726 self._cmdserveripc.send(self.root)
727 except socket.error:
727 except socket.error:
728 self.ui.log(
728 self.ui.log(
729 b'cmdserver', b'failed to send repo root to master\n'
729 b'cmdserver', b'failed to send repo root to master\n'
730 )
730 )
731
731
732 repo.__class__ = unixcmdserverrepo
732 repo.__class__ = unixcmdserverrepo
733 repo._cmdserveripc = self._workeripc
733 repo._cmdserveripc = self._workeripc
734
734
735 cachedrepo = self._repoloader.get(repo.root)
735 cachedrepo = self._repoloader.get(repo.root)
736 if cachedrepo is None:
736 if cachedrepo is None:
737 return
737 return
738 repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root)
738 repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root)
739 repocache.copycache(cachedrepo, repo)
739 repocache.copycache(cachedrepo, repo)
General Comments 0
You need to be logged in to leave comments. Login now