##// END OF EJS Templates
commandserver: implement name() to clarify channel is not a plain file...
Yuya Nishihara -
r27415:f4ca33e3 default
parent child Browse files
Show More
@@ -1,366 +1,374 b''
1 1 # commandserver.py - communicate with Mercurial's API over a pipe
2 2 #
3 3 # Copyright Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import SocketServer
11 11 import errno
12 12 import os
13 13 import struct
14 14 import sys
15 15 import traceback
16 16
17 17 from .i18n import _
18 18 from . import (
19 19 encoding,
20 20 error,
21 21 util,
22 22 )
23 23
24 24 logfile = None
25 25
26 26 def log(*args):
27 27 if not logfile:
28 28 return
29 29
30 30 for a in args:
31 31 logfile.write(str(a))
32 32
33 33 logfile.flush()
34 34
35 35 class channeledoutput(object):
36 36 """
37 37 Write data to out in the following format:
38 38
39 39 data length (unsigned int),
40 40 data
41 41 """
42 42 def __init__(self, out, channel):
43 43 self.out = out
44 44 self.channel = channel
45 45
46 @property
47 def name(self):
48 return '<%c-channel>' % self.channel
49
46 50 def write(self, data):
47 51 if not data:
48 52 return
49 53 self.out.write(struct.pack('>cI', self.channel, len(data)))
50 54 self.out.write(data)
51 55 self.out.flush()
52 56
53 57 def __getattr__(self, attr):
54 58 if attr in ('isatty', 'fileno'):
55 59 raise AttributeError(attr)
56 60 return getattr(self.out, attr)
57 61
58 62 class channeledinput(object):
59 63 """
60 64 Read data from in_.
61 65
62 66 Requests for input are written to out in the following format:
63 67 channel identifier - 'I' for plain input, 'L' line based (1 byte)
64 68 how many bytes to send at most (unsigned int),
65 69
66 70 The client replies with:
67 71 data length (unsigned int), 0 meaning EOF
68 72 data
69 73 """
70 74
71 75 maxchunksize = 4 * 1024
72 76
73 77 def __init__(self, in_, out, channel):
74 78 self.in_ = in_
75 79 self.out = out
76 80 self.channel = channel
77 81
82 @property
83 def name(self):
84 return '<%c-channel>' % self.channel
85
78 86 def read(self, size=-1):
79 87 if size < 0:
80 88 # if we need to consume all the clients input, ask for 4k chunks
81 89 # so the pipe doesn't fill up risking a deadlock
82 90 size = self.maxchunksize
83 91 s = self._read(size, self.channel)
84 92 buf = s
85 93 while s:
86 94 s = self._read(size, self.channel)
87 95 buf += s
88 96
89 97 return buf
90 98 else:
91 99 return self._read(size, self.channel)
92 100
93 101 def _read(self, size, channel):
94 102 if not size:
95 103 return ''
96 104 assert size > 0
97 105
98 106 # tell the client we need at most size bytes
99 107 self.out.write(struct.pack('>cI', channel, size))
100 108 self.out.flush()
101 109
102 110 length = self.in_.read(4)
103 111 length = struct.unpack('>I', length)[0]
104 112 if not length:
105 113 return ''
106 114 else:
107 115 return self.in_.read(length)
108 116
109 117 def readline(self, size=-1):
110 118 if size < 0:
111 119 size = self.maxchunksize
112 120 s = self._read(size, 'L')
113 121 buf = s
114 122 # keep asking for more until there's either no more or
115 123 # we got a full line
116 124 while s and s[-1] != '\n':
117 125 s = self._read(size, 'L')
118 126 buf += s
119 127
120 128 return buf
121 129 else:
122 130 return self._read(size, 'L')
123 131
124 132 def __iter__(self):
125 133 return self
126 134
127 135 def next(self):
128 136 l = self.readline()
129 137 if not l:
130 138 raise StopIteration
131 139 return l
132 140
133 141 def __getattr__(self, attr):
134 142 if attr in ('isatty', 'fileno'):
135 143 raise AttributeError(attr)
136 144 return getattr(self.in_, attr)
137 145
138 146 class server(object):
139 147 """
140 148 Listens for commands on fin, runs them and writes the output on a channel
141 149 based stream to fout.
142 150 """
143 151 def __init__(self, ui, repo, fin, fout):
144 152 self.cwd = os.getcwd()
145 153
146 154 # developer config: cmdserver.log
147 155 logpath = ui.config("cmdserver", "log", None)
148 156 if logpath:
149 157 global logfile
150 158 if logpath == '-':
151 159 # write log on a special 'd' (debug) channel
152 160 logfile = channeledoutput(fout, 'd')
153 161 else:
154 162 logfile = open(logpath, 'a')
155 163
156 164 if repo:
157 165 # the ui here is really the repo ui so take its baseui so we don't
158 166 # end up with its local configuration
159 167 self.ui = repo.baseui
160 168 self.repo = repo
161 169 self.repoui = repo.ui
162 170 else:
163 171 self.ui = ui
164 172 self.repo = self.repoui = None
165 173
166 174 self.cerr = channeledoutput(fout, 'e')
167 175 self.cout = channeledoutput(fout, 'o')
168 176 self.cin = channeledinput(fin, fout, 'I')
169 177 self.cresult = channeledoutput(fout, 'r')
170 178
171 179 self.client = fin
172 180
173 181 def _read(self, size):
174 182 if not size:
175 183 return ''
176 184
177 185 data = self.client.read(size)
178 186
179 187 # is the other end closed?
180 188 if not data:
181 189 raise EOFError
182 190
183 191 return data
184 192
185 193 def runcommand(self):
186 194 """ reads a list of \0 terminated arguments, executes
187 195 and writes the return code to the result channel """
188 196 from . import dispatch # avoid cycle
189 197
190 198 length = struct.unpack('>I', self._read(4))[0]
191 199 if not length:
192 200 args = []
193 201 else:
194 202 args = self._read(length).split('\0')
195 203
196 204 # copy the uis so changes (e.g. --config or --verbose) don't
197 205 # persist between requests
198 206 copiedui = self.ui.copy()
199 207 uis = [copiedui]
200 208 if self.repo:
201 209 self.repo.baseui = copiedui
202 210 # clone ui without using ui.copy because this is protected
203 211 repoui = self.repoui.__class__(self.repoui)
204 212 repoui.copy = copiedui.copy # redo copy protection
205 213 uis.append(repoui)
206 214 self.repo.ui = self.repo.dirstate._ui = repoui
207 215 self.repo.invalidateall()
208 216
209 217 for ui in uis:
210 218 # any kind of interaction must use server channels
211 219 ui.setconfig('ui', 'nontty', 'true', 'commandserver')
212 220
213 221 req = dispatch.request(args[:], copiedui, self.repo, self.cin,
214 222 self.cout, self.cerr)
215 223
216 224 ret = (dispatch.dispatch(req) or 0) & 255 # might return None
217 225
218 226 # restore old cwd
219 227 if '--cwd' in args:
220 228 os.chdir(self.cwd)
221 229
222 230 self.cresult.write(struct.pack('>i', int(ret)))
223 231
224 232 def getencoding(self):
225 233 """ writes the current encoding to the result channel """
226 234 self.cresult.write(encoding.encoding)
227 235
228 236 def serveone(self):
229 237 cmd = self.client.readline()[:-1]
230 238 if cmd:
231 239 handler = self.capabilities.get(cmd)
232 240 if handler:
233 241 handler(self)
234 242 else:
235 243 # clients are expected to check what commands are supported by
236 244 # looking at the servers capabilities
237 245 raise error.Abort(_('unknown command %s') % cmd)
238 246
239 247 return cmd != ''
240 248
241 249 capabilities = {'runcommand' : runcommand,
242 250 'getencoding' : getencoding}
243 251
244 252 def serve(self):
245 253 hellomsg = 'capabilities: ' + ' '.join(sorted(self.capabilities))
246 254 hellomsg += '\n'
247 255 hellomsg += 'encoding: ' + encoding.encoding
248 256 hellomsg += '\n'
249 257 hellomsg += 'pid: %d' % os.getpid()
250 258
251 259 # write the hello msg in -one- chunk
252 260 self.cout.write(hellomsg)
253 261
254 262 try:
255 263 while self.serveone():
256 264 pass
257 265 except EOFError:
258 266 # we'll get here if the client disconnected while we were reading
259 267 # its request
260 268 return 1
261 269
262 270 return 0
263 271
264 272 def _protectio(ui):
265 273 """ duplicates streams and redirect original to null if ui uses stdio """
266 274 ui.flush()
267 275 newfiles = []
268 276 nullfd = os.open(os.devnull, os.O_RDWR)
269 277 for f, sysf, mode in [(ui.fin, sys.stdin, 'rb'),
270 278 (ui.fout, sys.stdout, 'wb')]:
271 279 if f is sysf:
272 280 newfd = os.dup(f.fileno())
273 281 os.dup2(nullfd, f.fileno())
274 282 f = os.fdopen(newfd, mode)
275 283 newfiles.append(f)
276 284 os.close(nullfd)
277 285 return tuple(newfiles)
278 286
279 287 def _restoreio(ui, fin, fout):
280 288 """ restores streams from duplicated ones """
281 289 ui.flush()
282 290 for f, uif in [(fin, ui.fin), (fout, ui.fout)]:
283 291 if f is not uif:
284 292 os.dup2(f.fileno(), uif.fileno())
285 293 f.close()
286 294
287 295 class pipeservice(object):
288 296 def __init__(self, ui, repo, opts):
289 297 self.ui = ui
290 298 self.repo = repo
291 299
292 300 def init(self):
293 301 pass
294 302
295 303 def run(self):
296 304 ui = self.ui
297 305 # redirect stdio to null device so that broken extensions or in-process
298 306 # hooks will never cause corruption of channel protocol.
299 307 fin, fout = _protectio(ui)
300 308 try:
301 309 sv = server(ui, self.repo, fin, fout)
302 310 return sv.serve()
303 311 finally:
304 312 _restoreio(ui, fin, fout)
305 313
306 314 class _requesthandler(SocketServer.StreamRequestHandler):
307 315 def handle(self):
308 316 ui = self.server.ui
309 317 repo = self.server.repo
310 318 sv = server(ui, repo, self.rfile, self.wfile)
311 319 try:
312 320 try:
313 321 sv.serve()
314 322 # handle exceptions that may be raised by command server. most of
315 323 # known exceptions are caught by dispatch.
316 324 except error.Abort as inst:
317 325 ui.warn(_('abort: %s\n') % inst)
318 326 except IOError as inst:
319 327 if inst.errno != errno.EPIPE:
320 328 raise
321 329 except KeyboardInterrupt:
322 330 pass
323 331 except: # re-raises
324 332 # also write traceback to error channel. otherwise client cannot
325 333 # see it because it is written to server's stderr by default.
326 334 traceback.print_exc(file=sv.cerr)
327 335 raise
328 336
329 337 class unixservice(object):
330 338 """
331 339 Listens on unix domain socket and forks server per connection
332 340 """
333 341 def __init__(self, ui, repo, opts):
334 342 self.ui = ui
335 343 self.repo = repo
336 344 self.address = opts['address']
337 345 if not util.safehasattr(SocketServer, 'UnixStreamServer'):
338 346 raise error.Abort(_('unsupported platform'))
339 347 if not self.address:
340 348 raise error.Abort(_('no socket path specified with --address'))
341 349
342 350 def init(self):
343 351 class cls(SocketServer.ForkingMixIn, SocketServer.UnixStreamServer):
344 352 ui = self.ui
345 353 repo = self.repo
346 354 self.server = cls(self.address, _requesthandler)
347 355 self.ui.status(_('listening at %s\n') % self.address)
348 356 self.ui.flush() # avoid buffering of status message
349 357
350 358 def run(self):
351 359 try:
352 360 self.server.serve_forever()
353 361 finally:
354 362 os.unlink(self.address)
355 363
356 364 _servicemap = {
357 365 'pipe': pipeservice,
358 366 'unix': unixservice,
359 367 }
360 368
361 369 def createservice(ui, repo, opts):
362 370 mode = opts['cmdserver']
363 371 try:
364 372 return _servicemap[mode](ui, repo, opts)
365 373 except KeyError:
366 374 raise error.Abort(_('unknown mode %s') % mode)
General Comments 0
You need to be logged in to leave comments. Login now