##// END OF EJS Templates
commandserver: implement name() to clarify channel is not a plain file...
Yuya Nishihara -
r27415:f4ca33e3 default
parent child Browse files
Show More
@@ -1,366 +1,374 b''
1 # commandserver.py - communicate with Mercurial's API over a pipe
1 # commandserver.py - communicate with Mercurial's API over a pipe
2 #
2 #
3 # Copyright Matt Mackall <mpm@selenic.com>
3 # Copyright Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import SocketServer
10 import SocketServer
11 import errno
11 import errno
12 import os
12 import os
13 import struct
13 import struct
14 import sys
14 import sys
15 import traceback
15 import traceback
16
16
17 from .i18n import _
17 from .i18n import _
18 from . import (
18 from . import (
19 encoding,
19 encoding,
20 error,
20 error,
21 util,
21 util,
22 )
22 )
23
23
24 logfile = None
24 logfile = None
25
25
26 def log(*args):
26 def log(*args):
27 if not logfile:
27 if not logfile:
28 return
28 return
29
29
30 for a in args:
30 for a in args:
31 logfile.write(str(a))
31 logfile.write(str(a))
32
32
33 logfile.flush()
33 logfile.flush()
34
34
35 class channeledoutput(object):
35 class channeledoutput(object):
36 """
36 """
37 Write data to out in the following format:
37 Write data to out in the following format:
38
38
39 data length (unsigned int),
39 data length (unsigned int),
40 data
40 data
41 """
41 """
42 def __init__(self, out, channel):
42 def __init__(self, out, channel):
43 self.out = out
43 self.out = out
44 self.channel = channel
44 self.channel = channel
45
45
46 @property
47 def name(self):
48 return '<%c-channel>' % self.channel
49
46 def write(self, data):
50 def write(self, data):
47 if not data:
51 if not data:
48 return
52 return
49 self.out.write(struct.pack('>cI', self.channel, len(data)))
53 self.out.write(struct.pack('>cI', self.channel, len(data)))
50 self.out.write(data)
54 self.out.write(data)
51 self.out.flush()
55 self.out.flush()
52
56
53 def __getattr__(self, attr):
57 def __getattr__(self, attr):
54 if attr in ('isatty', 'fileno'):
58 if attr in ('isatty', 'fileno'):
55 raise AttributeError(attr)
59 raise AttributeError(attr)
56 return getattr(self.out, attr)
60 return getattr(self.out, attr)
57
61
58 class channeledinput(object):
62 class channeledinput(object):
59 """
63 """
60 Read data from in_.
64 Read data from in_.
61
65
62 Requests for input are written to out in the following format:
66 Requests for input are written to out in the following format:
63 channel identifier - 'I' for plain input, 'L' line based (1 byte)
67 channel identifier - 'I' for plain input, 'L' line based (1 byte)
64 how many bytes to send at most (unsigned int),
68 how many bytes to send at most (unsigned int),
65
69
66 The client replies with:
70 The client replies with:
67 data length (unsigned int), 0 meaning EOF
71 data length (unsigned int), 0 meaning EOF
68 data
72 data
69 """
73 """
70
74
71 maxchunksize = 4 * 1024
75 maxchunksize = 4 * 1024
72
76
73 def __init__(self, in_, out, channel):
77 def __init__(self, in_, out, channel):
74 self.in_ = in_
78 self.in_ = in_
75 self.out = out
79 self.out = out
76 self.channel = channel
80 self.channel = channel
77
81
82 @property
83 def name(self):
84 return '<%c-channel>' % self.channel
85
78 def read(self, size=-1):
86 def read(self, size=-1):
79 if size < 0:
87 if size < 0:
80 # if we need to consume all the clients input, ask for 4k chunks
88 # if we need to consume all the clients input, ask for 4k chunks
81 # so the pipe doesn't fill up risking a deadlock
89 # so the pipe doesn't fill up risking a deadlock
82 size = self.maxchunksize
90 size = self.maxchunksize
83 s = self._read(size, self.channel)
91 s = self._read(size, self.channel)
84 buf = s
92 buf = s
85 while s:
93 while s:
86 s = self._read(size, self.channel)
94 s = self._read(size, self.channel)
87 buf += s
95 buf += s
88
96
89 return buf
97 return buf
90 else:
98 else:
91 return self._read(size, self.channel)
99 return self._read(size, self.channel)
92
100
93 def _read(self, size, channel):
101 def _read(self, size, channel):
94 if not size:
102 if not size:
95 return ''
103 return ''
96 assert size > 0
104 assert size > 0
97
105
98 # tell the client we need at most size bytes
106 # tell the client we need at most size bytes
99 self.out.write(struct.pack('>cI', channel, size))
107 self.out.write(struct.pack('>cI', channel, size))
100 self.out.flush()
108 self.out.flush()
101
109
102 length = self.in_.read(4)
110 length = self.in_.read(4)
103 length = struct.unpack('>I', length)[0]
111 length = struct.unpack('>I', length)[0]
104 if not length:
112 if not length:
105 return ''
113 return ''
106 else:
114 else:
107 return self.in_.read(length)
115 return self.in_.read(length)
108
116
109 def readline(self, size=-1):
117 def readline(self, size=-1):
110 if size < 0:
118 if size < 0:
111 size = self.maxchunksize
119 size = self.maxchunksize
112 s = self._read(size, 'L')
120 s = self._read(size, 'L')
113 buf = s
121 buf = s
114 # keep asking for more until there's either no more or
122 # keep asking for more until there's either no more or
115 # we got a full line
123 # we got a full line
116 while s and s[-1] != '\n':
124 while s and s[-1] != '\n':
117 s = self._read(size, 'L')
125 s = self._read(size, 'L')
118 buf += s
126 buf += s
119
127
120 return buf
128 return buf
121 else:
129 else:
122 return self._read(size, 'L')
130 return self._read(size, 'L')
123
131
124 def __iter__(self):
132 def __iter__(self):
125 return self
133 return self
126
134
127 def next(self):
135 def next(self):
128 l = self.readline()
136 l = self.readline()
129 if not l:
137 if not l:
130 raise StopIteration
138 raise StopIteration
131 return l
139 return l
132
140
133 def __getattr__(self, attr):
141 def __getattr__(self, attr):
134 if attr in ('isatty', 'fileno'):
142 if attr in ('isatty', 'fileno'):
135 raise AttributeError(attr)
143 raise AttributeError(attr)
136 return getattr(self.in_, attr)
144 return getattr(self.in_, attr)
137
145
138 class server(object):
146 class server(object):
139 """
147 """
140 Listens for commands on fin, runs them and writes the output on a channel
148 Listens for commands on fin, runs them and writes the output on a channel
141 based stream to fout.
149 based stream to fout.
142 """
150 """
143 def __init__(self, ui, repo, fin, fout):
151 def __init__(self, ui, repo, fin, fout):
144 self.cwd = os.getcwd()
152 self.cwd = os.getcwd()
145
153
146 # developer config: cmdserver.log
154 # developer config: cmdserver.log
147 logpath = ui.config("cmdserver", "log", None)
155 logpath = ui.config("cmdserver", "log", None)
148 if logpath:
156 if logpath:
149 global logfile
157 global logfile
150 if logpath == '-':
158 if logpath == '-':
151 # write log on a special 'd' (debug) channel
159 # write log on a special 'd' (debug) channel
152 logfile = channeledoutput(fout, 'd')
160 logfile = channeledoutput(fout, 'd')
153 else:
161 else:
154 logfile = open(logpath, 'a')
162 logfile = open(logpath, 'a')
155
163
156 if repo:
164 if repo:
157 # the ui here is really the repo ui so take its baseui so we don't
165 # the ui here is really the repo ui so take its baseui so we don't
158 # end up with its local configuration
166 # end up with its local configuration
159 self.ui = repo.baseui
167 self.ui = repo.baseui
160 self.repo = repo
168 self.repo = repo
161 self.repoui = repo.ui
169 self.repoui = repo.ui
162 else:
170 else:
163 self.ui = ui
171 self.ui = ui
164 self.repo = self.repoui = None
172 self.repo = self.repoui = None
165
173
166 self.cerr = channeledoutput(fout, 'e')
174 self.cerr = channeledoutput(fout, 'e')
167 self.cout = channeledoutput(fout, 'o')
175 self.cout = channeledoutput(fout, 'o')
168 self.cin = channeledinput(fin, fout, 'I')
176 self.cin = channeledinput(fin, fout, 'I')
169 self.cresult = channeledoutput(fout, 'r')
177 self.cresult = channeledoutput(fout, 'r')
170
178
171 self.client = fin
179 self.client = fin
172
180
173 def _read(self, size):
181 def _read(self, size):
174 if not size:
182 if not size:
175 return ''
183 return ''
176
184
177 data = self.client.read(size)
185 data = self.client.read(size)
178
186
179 # is the other end closed?
187 # is the other end closed?
180 if not data:
188 if not data:
181 raise EOFError
189 raise EOFError
182
190
183 return data
191 return data
184
192
185 def runcommand(self):
193 def runcommand(self):
186 """ reads a list of \0 terminated arguments, executes
194 """ reads a list of \0 terminated arguments, executes
187 and writes the return code to the result channel """
195 and writes the return code to the result channel """
188 from . import dispatch # avoid cycle
196 from . import dispatch # avoid cycle
189
197
190 length = struct.unpack('>I', self._read(4))[0]
198 length = struct.unpack('>I', self._read(4))[0]
191 if not length:
199 if not length:
192 args = []
200 args = []
193 else:
201 else:
194 args = self._read(length).split('\0')
202 args = self._read(length).split('\0')
195
203
196 # copy the uis so changes (e.g. --config or --verbose) don't
204 # copy the uis so changes (e.g. --config or --verbose) don't
197 # persist between requests
205 # persist between requests
198 copiedui = self.ui.copy()
206 copiedui = self.ui.copy()
199 uis = [copiedui]
207 uis = [copiedui]
200 if self.repo:
208 if self.repo:
201 self.repo.baseui = copiedui
209 self.repo.baseui = copiedui
202 # clone ui without using ui.copy because this is protected
210 # clone ui without using ui.copy because this is protected
203 repoui = self.repoui.__class__(self.repoui)
211 repoui = self.repoui.__class__(self.repoui)
204 repoui.copy = copiedui.copy # redo copy protection
212 repoui.copy = copiedui.copy # redo copy protection
205 uis.append(repoui)
213 uis.append(repoui)
206 self.repo.ui = self.repo.dirstate._ui = repoui
214 self.repo.ui = self.repo.dirstate._ui = repoui
207 self.repo.invalidateall()
215 self.repo.invalidateall()
208
216
209 for ui in uis:
217 for ui in uis:
210 # any kind of interaction must use server channels
218 # any kind of interaction must use server channels
211 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
219 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
212
220
213 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
221 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
214 self.cout, self.cerr)
222 self.cout, self.cerr)
215
223
216 ret = (dispatch.dispatch(req) or 0) & 255 # might return None
224 ret = (dispatch.dispatch(req) or 0) & 255 # might return None
217
225
218 # restore old cwd
226 # restore old cwd
219 if '--cwd' in args:
227 if '--cwd' in args:
220 os.chdir(self.cwd)
228 os.chdir(self.cwd)
221
229
222 self.cresult.write(struct.pack('>i', int(ret)))
230 self.cresult.write(struct.pack('>i', int(ret)))
223
231
224 def getencoding(self):
232 def getencoding(self):
225 """ writes the current encoding to the result channel """
233 """ writes the current encoding to the result channel """
226 self.cresult.write(encoding.encoding)
234 self.cresult.write(encoding.encoding)
227
235
228 def serveone(self):
236 def serveone(self):
229 cmd = self.client.readline()[:-1]
237 cmd = self.client.readline()[:-1]
230 if cmd:
238 if cmd:
231 handler = self.capabilities.get(cmd)
239 handler = self.capabilities.get(cmd)
232 if handler:
240 if handler:
233 handler(self)
241 handler(self)
234 else:
242 else:
235 # clients are expected to check what commands are supported by
243 # clients are expected to check what commands are supported by
236 # looking at the servers capabilities
244 # looking at the servers capabilities
237 raise error.Abort(_('unknown command %s') % cmd)
245 raise error.Abort(_('unknown command %s') % cmd)
238
246
239 return cmd != ''
247 return cmd != ''
240
248
241 capabilities = {'runcommand' : runcommand,
249 capabilities = {'runcommand' : runcommand,
242 'getencoding' : getencoding}
250 'getencoding' : getencoding}
243
251
244 def serve(self):
252 def serve(self):
245 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
253 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
246 hellomsg += '\n'
254 hellomsg += '\n'
247 hellomsg += 'encoding: ' + encoding.encoding
255 hellomsg += 'encoding: ' + encoding.encoding
248 hellomsg += '\n'
256 hellomsg += '\n'
249 hellomsg += 'pid: %d' % os.getpid()
257 hellomsg += 'pid: %d' % os.getpid()
250
258
251 # write the hello msg in -one- chunk
259 # write the hello msg in -one- chunk
252 self.cout.write(hellomsg)
260 self.cout.write(hellomsg)
253
261
254 try:
262 try:
255 while self.serveone():
263 while self.serveone():
256 pass
264 pass
257 except EOFError:
265 except EOFError:
258 # we'll get here if the client disconnected while we were reading
266 # we'll get here if the client disconnected while we were reading
259 # its request
267 # its request
260 return 1
268 return 1
261
269
262 return 0
270 return 0
263
271
264 def _protectio(ui):
272 def _protectio(ui):
265 """ duplicates streams and redirect original to null if ui uses stdio """
273 """ duplicates streams and redirect original to null if ui uses stdio """
266 ui.flush()
274 ui.flush()
267 newfiles = []
275 newfiles = []
268 nullfd = os.open(os.devnull, os.O_RDWR)
276 nullfd = os.open(os.devnull, os.O_RDWR)
269 for f, sysf, mode in [(ui.fin, sys.stdin, 'rb'),
277 for f, sysf, mode in [(ui.fin, sys.stdin, 'rb'),
270 (ui.fout, sys.stdout, 'wb')]:
278 (ui.fout, sys.stdout, 'wb')]:
271 if f is sysf:
279 if f is sysf:
272 newfd = os.dup(f.fileno())
280 newfd = os.dup(f.fileno())
273 os.dup2(nullfd, f.fileno())
281 os.dup2(nullfd, f.fileno())
274 f = os.fdopen(newfd, mode)
282 f = os.fdopen(newfd, mode)
275 newfiles.append(f)
283 newfiles.append(f)
276 os.close(nullfd)
284 os.close(nullfd)
277 return tuple(newfiles)
285 return tuple(newfiles)
278
286
279 def _restoreio(ui, fin, fout):
287 def _restoreio(ui, fin, fout):
280 """ restores streams from duplicated ones """
288 """ restores streams from duplicated ones """
281 ui.flush()
289 ui.flush()
282 for f, uif in [(fin, ui.fin), (fout, ui.fout)]:
290 for f, uif in [(fin, ui.fin), (fout, ui.fout)]:
283 if f is not uif:
291 if f is not uif:
284 os.dup2(f.fileno(), uif.fileno())
292 os.dup2(f.fileno(), uif.fileno())
285 f.close()
293 f.close()
286
294
287 class pipeservice(object):
295 class pipeservice(object):
288 def __init__(self, ui, repo, opts):
296 def __init__(self, ui, repo, opts):
289 self.ui = ui
297 self.ui = ui
290 self.repo = repo
298 self.repo = repo
291
299
292 def init(self):
300 def init(self):
293 pass
301 pass
294
302
295 def run(self):
303 def run(self):
296 ui = self.ui
304 ui = self.ui
297 # redirect stdio to null device so that broken extensions or in-process
305 # redirect stdio to null device so that broken extensions or in-process
298 # hooks will never cause corruption of channel protocol.
306 # hooks will never cause corruption of channel protocol.
299 fin, fout = _protectio(ui)
307 fin, fout = _protectio(ui)
300 try:
308 try:
301 sv = server(ui, self.repo, fin, fout)
309 sv = server(ui, self.repo, fin, fout)
302 return sv.serve()
310 return sv.serve()
303 finally:
311 finally:
304 _restoreio(ui, fin, fout)
312 _restoreio(ui, fin, fout)
305
313
306 class _requesthandler(SocketServer.StreamRequestHandler):
314 class _requesthandler(SocketServer.StreamRequestHandler):
307 def handle(self):
315 def handle(self):
308 ui = self.server.ui
316 ui = self.server.ui
309 repo = self.server.repo
317 repo = self.server.repo
310 sv = server(ui, repo, self.rfile, self.wfile)
318 sv = server(ui, repo, self.rfile, self.wfile)
311 try:
319 try:
312 try:
320 try:
313 sv.serve()
321 sv.serve()
314 # handle exceptions that may be raised by command server. most of
322 # handle exceptions that may be raised by command server. most of
315 # known exceptions are caught by dispatch.
323 # known exceptions are caught by dispatch.
316 except error.Abort as inst:
324 except error.Abort as inst:
317 ui.warn(_('abort: %s\n') % inst)
325 ui.warn(_('abort: %s\n') % inst)
318 except IOError as inst:
326 except IOError as inst:
319 if inst.errno != errno.EPIPE:
327 if inst.errno != errno.EPIPE:
320 raise
328 raise
321 except KeyboardInterrupt:
329 except KeyboardInterrupt:
322 pass
330 pass
323 except: # re-raises
331 except: # re-raises
324 # also write traceback to error channel. otherwise client cannot
332 # also write traceback to error channel. otherwise client cannot
325 # see it because it is written to server's stderr by default.
333 # see it because it is written to server's stderr by default.
326 traceback.print_exc(file=sv.cerr)
334 traceback.print_exc(file=sv.cerr)
327 raise
335 raise
328
336
329 class unixservice(object):
337 class unixservice(object):
330 """
338 """
331 Listens on unix domain socket and forks server per connection
339 Listens on unix domain socket and forks server per connection
332 """
340 """
333 def __init__(self, ui, repo, opts):
341 def __init__(self, ui, repo, opts):
334 self.ui = ui
342 self.ui = ui
335 self.repo = repo
343 self.repo = repo
336 self.address = opts['address']
344 self.address = opts['address']
337 if not util.safehasattr(SocketServer, 'UnixStreamServer'):
345 if not util.safehasattr(SocketServer, 'UnixStreamServer'):
338 raise error.Abort(_('unsupported platform'))
346 raise error.Abort(_('unsupported platform'))
339 if not self.address:
347 if not self.address:
340 raise error.Abort(_('no socket path specified with --address'))
348 raise error.Abort(_('no socket path specified with --address'))
341
349
342 def init(self):
350 def init(self):
343 class cls(SocketServer.ForkingMixIn, SocketServer.UnixStreamServer):
351 class cls(SocketServer.ForkingMixIn, SocketServer.UnixStreamServer):
344 ui = self.ui
352 ui = self.ui
345 repo = self.repo
353 repo = self.repo
346 self.server = cls(self.address, _requesthandler)
354 self.server = cls(self.address, _requesthandler)
347 self.ui.status(_('listening at %s\n') % self.address)
355 self.ui.status(_('listening at %s\n') % self.address)
348 self.ui.flush() # avoid buffering of status message
356 self.ui.flush() # avoid buffering of status message
349
357
350 def run(self):
358 def run(self):
351 try:
359 try:
352 self.server.serve_forever()
360 self.server.serve_forever()
353 finally:
361 finally:
354 os.unlink(self.address)
362 os.unlink(self.address)
355
363
356 _servicemap = {
364 _servicemap = {
357 'pipe': pipeservice,
365 'pipe': pipeservice,
358 'unix': unixservice,
366 'unix': unixservice,
359 }
367 }
360
368
361 def createservice(ui, repo, opts):
369 def createservice(ui, repo, opts):
362 mode = opts['cmdserver']
370 mode = opts['cmdserver']
363 try:
371 try:
364 return _servicemap[mode](ui, repo, opts)
372 return _servicemap[mode](ui, repo, opts)
365 except KeyError:
373 except KeyError:
366 raise error.Abort(_('unknown mode %s') % mode)
374 raise error.Abort(_('unknown mode %s') % mode)
General Comments 0
You need to be logged in to leave comments. Login now