##// END OF EJS Templates
thirdparty: remove Python 2-specific selectors2 copy...
Manuel Jacob -
r50175:311fcc5a default
parent child Browse files
Show More
@@ -1,770 +1,756 b''
1 # commandserver.py - communicate with Mercurial's API over a pipe
1 # commandserver.py - communicate with Mercurial's API over a pipe
2 #
2 #
3 # Copyright Olivia Mackall <olivia@selenic.com>
3 # Copyright Olivia Mackall <olivia@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import errno
9 import errno
10 import gc
10 import gc
11 import os
11 import os
12 import random
12 import random
13 import selectors
13 import signal
14 import signal
14 import socket
15 import socket
15 import struct
16 import struct
16 import traceback
17 import traceback
17
18
18 try:
19 import selectors
20
21 selectors.BaseSelector
22 except ImportError:
23 from .thirdparty import selectors2 as selectors
24
25 from .i18n import _
19 from .i18n import _
26 from .pycompat import getattr
20 from .pycompat import getattr
27 from . import (
21 from . import (
28 encoding,
22 encoding,
29 error,
23 error,
30 loggingutil,
24 loggingutil,
31 pycompat,
25 pycompat,
32 repocache,
26 repocache,
33 util,
27 util,
34 vfs as vfsmod,
28 vfs as vfsmod,
35 )
29 )
36 from .utils import (
30 from .utils import (
37 cborutil,
31 cborutil,
38 procutil,
32 procutil,
39 )
33 )
40
34
41
35
42 class channeledoutput:
36 class channeledoutput:
43 """
37 """
44 Write data to out in the following format:
38 Write data to out in the following format:
45
39
46 data length (unsigned int),
40 data length (unsigned int),
47 data
41 data
48 """
42 """
49
43
50 def __init__(self, out, channel):
44 def __init__(self, out, channel):
51 self.out = out
45 self.out = out
52 self.channel = channel
46 self.channel = channel
53
47
54 @property
48 @property
55 def name(self):
49 def name(self):
56 return b'<%c-channel>' % self.channel
50 return b'<%c-channel>' % self.channel
57
51
58 def write(self, data):
52 def write(self, data):
59 if not data:
53 if not data:
60 return
54 return
61 # single write() to guarantee the same atomicity as the underlying file
55 # single write() to guarantee the same atomicity as the underlying file
62 self.out.write(struct.pack(b'>cI', self.channel, len(data)) + data)
56 self.out.write(struct.pack(b'>cI', self.channel, len(data)) + data)
63 self.out.flush()
57 self.out.flush()
64
58
65 def __getattr__(self, attr):
59 def __getattr__(self, attr):
66 if attr in ('isatty', 'fileno', 'tell', 'seek'):
60 if attr in ('isatty', 'fileno', 'tell', 'seek'):
67 raise AttributeError(attr)
61 raise AttributeError(attr)
68 return getattr(self.out, attr)
62 return getattr(self.out, attr)
69
63
70
64
71 class channeledmessage:
65 class channeledmessage:
72 """
66 """
73 Write encoded message and metadata to out in the following format:
67 Write encoded message and metadata to out in the following format:
74
68
75 data length (unsigned int),
69 data length (unsigned int),
76 encoded message and metadata, as a flat key-value dict.
70 encoded message and metadata, as a flat key-value dict.
77
71
78 Each message should have 'type' attribute. Messages of unknown type
72 Each message should have 'type' attribute. Messages of unknown type
79 should be ignored.
73 should be ignored.
80 """
74 """
81
75
82 # teach ui that write() can take **opts
76 # teach ui that write() can take **opts
83 structured = True
77 structured = True
84
78
85 def __init__(self, out, channel, encodename, encodefn):
79 def __init__(self, out, channel, encodename, encodefn):
86 self._cout = channeledoutput(out, channel)
80 self._cout = channeledoutput(out, channel)
87 self.encoding = encodename
81 self.encoding = encodename
88 self._encodefn = encodefn
82 self._encodefn = encodefn
89
83
90 def write(self, data, **opts):
84 def write(self, data, **opts):
91 opts = pycompat.byteskwargs(opts)
85 opts = pycompat.byteskwargs(opts)
92 if data is not None:
86 if data is not None:
93 opts[b'data'] = data
87 opts[b'data'] = data
94 self._cout.write(self._encodefn(opts))
88 self._cout.write(self._encodefn(opts))
95
89
96 def __getattr__(self, attr):
90 def __getattr__(self, attr):
97 return getattr(self._cout, attr)
91 return getattr(self._cout, attr)
98
92
99
93
100 class channeledinput:
94 class channeledinput:
101 """
95 """
102 Read data from in_.
96 Read data from in_.
103
97
104 Requests for input are written to out in the following format:
98 Requests for input are written to out in the following format:
105 channel identifier - 'I' for plain input, 'L' line based (1 byte)
99 channel identifier - 'I' for plain input, 'L' line based (1 byte)
106 how many bytes to send at most (unsigned int),
100 how many bytes to send at most (unsigned int),
107
101
108 The client replies with:
102 The client replies with:
109 data length (unsigned int), 0 meaning EOF
103 data length (unsigned int), 0 meaning EOF
110 data
104 data
111 """
105 """
112
106
113 maxchunksize = 4 * 1024
107 maxchunksize = 4 * 1024
114
108
115 def __init__(self, in_, out, channel):
109 def __init__(self, in_, out, channel):
116 self.in_ = in_
110 self.in_ = in_
117 self.out = out
111 self.out = out
118 self.channel = channel
112 self.channel = channel
119
113
120 @property
114 @property
121 def name(self):
115 def name(self):
122 return b'<%c-channel>' % self.channel
116 return b'<%c-channel>' % self.channel
123
117
124 def read(self, size=-1):
118 def read(self, size=-1):
125 if size < 0:
119 if size < 0:
126 # if we need to consume all the clients input, ask for 4k chunks
120 # if we need to consume all the clients input, ask for 4k chunks
127 # so the pipe doesn't fill up risking a deadlock
121 # so the pipe doesn't fill up risking a deadlock
128 size = self.maxchunksize
122 size = self.maxchunksize
129 s = self._read(size, self.channel)
123 s = self._read(size, self.channel)
130 buf = s
124 buf = s
131 while s:
125 while s:
132 s = self._read(size, self.channel)
126 s = self._read(size, self.channel)
133 buf += s
127 buf += s
134
128
135 return buf
129 return buf
136 else:
130 else:
137 return self._read(size, self.channel)
131 return self._read(size, self.channel)
138
132
139 def _read(self, size, channel):
133 def _read(self, size, channel):
140 if not size:
134 if not size:
141 return b''
135 return b''
142 assert size > 0
136 assert size > 0
143
137
144 # tell the client we need at most size bytes
138 # tell the client we need at most size bytes
145 self.out.write(struct.pack(b'>cI', channel, size))
139 self.out.write(struct.pack(b'>cI', channel, size))
146 self.out.flush()
140 self.out.flush()
147
141
148 length = self.in_.read(4)
142 length = self.in_.read(4)
149 length = struct.unpack(b'>I', length)[0]
143 length = struct.unpack(b'>I', length)[0]
150 if not length:
144 if not length:
151 return b''
145 return b''
152 else:
146 else:
153 return self.in_.read(length)
147 return self.in_.read(length)
154
148
155 def readline(self, size=-1):
149 def readline(self, size=-1):
156 if size < 0:
150 if size < 0:
157 size = self.maxchunksize
151 size = self.maxchunksize
158 s = self._read(size, b'L')
152 s = self._read(size, b'L')
159 buf = s
153 buf = s
160 # keep asking for more until there's either no more or
154 # keep asking for more until there's either no more or
161 # we got a full line
155 # we got a full line
162 while s and not s.endswith(b'\n'):
156 while s and not s.endswith(b'\n'):
163 s = self._read(size, b'L')
157 s = self._read(size, b'L')
164 buf += s
158 buf += s
165
159
166 return buf
160 return buf
167 else:
161 else:
168 return self._read(size, b'L')
162 return self._read(size, b'L')
169
163
170 def __iter__(self):
164 def __iter__(self):
171 return self
165 return self
172
166
173 def next(self):
167 def next(self):
174 l = self.readline()
168 l = self.readline()
175 if not l:
169 if not l:
176 raise StopIteration
170 raise StopIteration
177 return l
171 return l
178
172
179 __next__ = next
173 __next__ = next
180
174
181 def __getattr__(self, attr):
175 def __getattr__(self, attr):
182 if attr in ('isatty', 'fileno', 'tell', 'seek'):
176 if attr in ('isatty', 'fileno', 'tell', 'seek'):
183 raise AttributeError(attr)
177 raise AttributeError(attr)
184 return getattr(self.in_, attr)
178 return getattr(self.in_, attr)
185
179
186
180
187 _messageencoders = {
181 _messageencoders = {
188 b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
182 b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
189 }
183 }
190
184
191
185
192 def _selectmessageencoder(ui):
186 def _selectmessageencoder(ui):
193 encnames = ui.configlist(b'cmdserver', b'message-encodings')
187 encnames = ui.configlist(b'cmdserver', b'message-encodings')
194 for n in encnames:
188 for n in encnames:
195 f = _messageencoders.get(n)
189 f = _messageencoders.get(n)
196 if f:
190 if f:
197 return n, f
191 return n, f
198 raise error.Abort(
192 raise error.Abort(
199 b'no supported message encodings: %s' % b' '.join(encnames)
193 b'no supported message encodings: %s' % b' '.join(encnames)
200 )
194 )
201
195
202
196
203 class server:
197 class server:
204 """
198 """
205 Listens for commands on fin, runs them and writes the output on a channel
199 Listens for commands on fin, runs them and writes the output on a channel
206 based stream to fout.
200 based stream to fout.
207 """
201 """
208
202
209 def __init__(self, ui, repo, fin, fout, prereposetups=None):
203 def __init__(self, ui, repo, fin, fout, prereposetups=None):
210 self.cwd = encoding.getcwd()
204 self.cwd = encoding.getcwd()
211
205
212 if repo:
206 if repo:
213 # the ui here is really the repo ui so take its baseui so we don't
207 # the ui here is really the repo ui so take its baseui so we don't
214 # end up with its local configuration
208 # end up with its local configuration
215 self.ui = repo.baseui
209 self.ui = repo.baseui
216 self.repo = repo
210 self.repo = repo
217 self.repoui = repo.ui
211 self.repoui = repo.ui
218 else:
212 else:
219 self.ui = ui
213 self.ui = ui
220 self.repo = self.repoui = None
214 self.repo = self.repoui = None
221 self._prereposetups = prereposetups
215 self._prereposetups = prereposetups
222
216
223 self.cdebug = channeledoutput(fout, b'd')
217 self.cdebug = channeledoutput(fout, b'd')
224 self.cerr = channeledoutput(fout, b'e')
218 self.cerr = channeledoutput(fout, b'e')
225 self.cout = channeledoutput(fout, b'o')
219 self.cout = channeledoutput(fout, b'o')
226 self.cin = channeledinput(fin, fout, b'I')
220 self.cin = channeledinput(fin, fout, b'I')
227 self.cresult = channeledoutput(fout, b'r')
221 self.cresult = channeledoutput(fout, b'r')
228
222
229 if self.ui.config(b'cmdserver', b'log') == b'-':
223 if self.ui.config(b'cmdserver', b'log') == b'-':
230 # switch log stream of server's ui to the 'd' (debug) channel
224 # switch log stream of server's ui to the 'd' (debug) channel
231 # (don't touch repo.ui as its lifetime is longer than the server)
225 # (don't touch repo.ui as its lifetime is longer than the server)
232 self.ui = self.ui.copy()
226 self.ui = self.ui.copy()
233 setuplogging(self.ui, repo=None, fp=self.cdebug)
227 setuplogging(self.ui, repo=None, fp=self.cdebug)
234
228
235 self.cmsg = None
229 self.cmsg = None
236 if ui.config(b'ui', b'message-output') == b'channel':
230 if ui.config(b'ui', b'message-output') == b'channel':
237 encname, encfn = _selectmessageencoder(ui)
231 encname, encfn = _selectmessageencoder(ui)
238 self.cmsg = channeledmessage(fout, b'm', encname, encfn)
232 self.cmsg = channeledmessage(fout, b'm', encname, encfn)
239
233
240 self.client = fin
234 self.client = fin
241
235
242 # If shutdown-on-interrupt is off, the default SIGINT handler is
236 # If shutdown-on-interrupt is off, the default SIGINT handler is
243 # removed so that client-server communication wouldn't be interrupted.
237 # removed so that client-server communication wouldn't be interrupted.
244 # For example, 'runcommand' handler will issue three short read()s.
238 # For example, 'runcommand' handler will issue three short read()s.
245 # If one of the first two read()s were interrupted, the communication
239 # If one of the first two read()s were interrupted, the communication
246 # channel would be left at dirty state and the subsequent request
240 # channel would be left at dirty state and the subsequent request
247 # wouldn't be parsed. So catching KeyboardInterrupt isn't enough.
241 # wouldn't be parsed. So catching KeyboardInterrupt isn't enough.
248 self._shutdown_on_interrupt = ui.configbool(
242 self._shutdown_on_interrupt = ui.configbool(
249 b'cmdserver', b'shutdown-on-interrupt'
243 b'cmdserver', b'shutdown-on-interrupt'
250 )
244 )
251 self._old_inthandler = None
245 self._old_inthandler = None
252 if not self._shutdown_on_interrupt:
246 if not self._shutdown_on_interrupt:
253 self._old_inthandler = signal.signal(signal.SIGINT, signal.SIG_IGN)
247 self._old_inthandler = signal.signal(signal.SIGINT, signal.SIG_IGN)
254
248
255 def cleanup(self):
249 def cleanup(self):
256 """release and restore resources taken during server session"""
250 """release and restore resources taken during server session"""
257 if not self._shutdown_on_interrupt:
251 if not self._shutdown_on_interrupt:
258 signal.signal(signal.SIGINT, self._old_inthandler)
252 signal.signal(signal.SIGINT, self._old_inthandler)
259
253
260 def _read(self, size):
254 def _read(self, size):
261 if not size:
255 if not size:
262 return b''
256 return b''
263
257
264 data = self.client.read(size)
258 data = self.client.read(size)
265
259
266 # is the other end closed?
260 # is the other end closed?
267 if not data:
261 if not data:
268 raise EOFError
262 raise EOFError
269
263
270 return data
264 return data
271
265
272 def _readstr(self):
266 def _readstr(self):
273 """read a string from the channel
267 """read a string from the channel
274
268
275 format:
269 format:
276 data length (uint32), data
270 data length (uint32), data
277 """
271 """
278 length = struct.unpack(b'>I', self._read(4))[0]
272 length = struct.unpack(b'>I', self._read(4))[0]
279 if not length:
273 if not length:
280 return b''
274 return b''
281 return self._read(length)
275 return self._read(length)
282
276
283 def _readlist(self):
277 def _readlist(self):
284 """read a list of NULL separated strings from the channel"""
278 """read a list of NULL separated strings from the channel"""
285 s = self._readstr()
279 s = self._readstr()
286 if s:
280 if s:
287 return s.split(b'\0')
281 return s.split(b'\0')
288 else:
282 else:
289 return []
283 return []
290
284
291 def _dispatchcommand(self, req):
285 def _dispatchcommand(self, req):
292 from . import dispatch # avoid cycle
286 from . import dispatch # avoid cycle
293
287
294 if self._shutdown_on_interrupt:
288 if self._shutdown_on_interrupt:
295 # no need to restore SIGINT handler as it is unmodified.
289 # no need to restore SIGINT handler as it is unmodified.
296 return dispatch.dispatch(req)
290 return dispatch.dispatch(req)
297
291
298 try:
292 try:
299 signal.signal(signal.SIGINT, self._old_inthandler)
293 signal.signal(signal.SIGINT, self._old_inthandler)
300 return dispatch.dispatch(req)
294 return dispatch.dispatch(req)
301 except error.SignalInterrupt:
295 except error.SignalInterrupt:
302 # propagate SIGBREAK, SIGHUP, or SIGTERM.
296 # propagate SIGBREAK, SIGHUP, or SIGTERM.
303 raise
297 raise
304 except KeyboardInterrupt:
298 except KeyboardInterrupt:
305 # SIGINT may be received out of the try-except block of dispatch(),
299 # SIGINT may be received out of the try-except block of dispatch(),
306 # so catch it as last ditch. Another KeyboardInterrupt may be
300 # so catch it as last ditch. Another KeyboardInterrupt may be
307 # raised while handling exceptions here, but there's no way to
301 # raised while handling exceptions here, but there's no way to
308 # avoid that except for doing everything in C.
302 # avoid that except for doing everything in C.
309 pass
303 pass
310 finally:
304 finally:
311 signal.signal(signal.SIGINT, signal.SIG_IGN)
305 signal.signal(signal.SIGINT, signal.SIG_IGN)
312 # On KeyboardInterrupt, print error message and exit *after* SIGINT
306 # On KeyboardInterrupt, print error message and exit *after* SIGINT
313 # handler removed.
307 # handler removed.
314 req.ui.error(_(b'interrupted!\n'))
308 req.ui.error(_(b'interrupted!\n'))
315 return -1
309 return -1
316
310
317 def runcommand(self):
311 def runcommand(self):
318 """reads a list of \0 terminated arguments, executes
312 """reads a list of \0 terminated arguments, executes
319 and writes the return code to the result channel"""
313 and writes the return code to the result channel"""
320 from . import dispatch # avoid cycle
314 from . import dispatch # avoid cycle
321
315
322 args = self._readlist()
316 args = self._readlist()
323
317
324 # copy the uis so changes (e.g. --config or --verbose) don't
318 # copy the uis so changes (e.g. --config or --verbose) don't
325 # persist between requests
319 # persist between requests
326 copiedui = self.ui.copy()
320 copiedui = self.ui.copy()
327 uis = [copiedui]
321 uis = [copiedui]
328 if self.repo:
322 if self.repo:
329 self.repo.baseui = copiedui
323 self.repo.baseui = copiedui
330 # clone ui without using ui.copy because this is protected
324 # clone ui without using ui.copy because this is protected
331 repoui = self.repoui.__class__(self.repoui)
325 repoui = self.repoui.__class__(self.repoui)
332 repoui.copy = copiedui.copy # redo copy protection
326 repoui.copy = copiedui.copy # redo copy protection
333 uis.append(repoui)
327 uis.append(repoui)
334 self.repo.ui = self.repo.dirstate._ui = repoui
328 self.repo.ui = self.repo.dirstate._ui = repoui
335 self.repo.invalidateall()
329 self.repo.invalidateall()
336
330
337 for ui in uis:
331 for ui in uis:
338 ui.resetstate()
332 ui.resetstate()
339 # any kind of interaction must use server channels, but chg may
333 # any kind of interaction must use server channels, but chg may
340 # replace channels by fully functional tty files. so nontty is
334 # replace channels by fully functional tty files. so nontty is
341 # enforced only if cin is a channel.
335 # enforced only if cin is a channel.
342 if not util.safehasattr(self.cin, b'fileno'):
336 if not util.safehasattr(self.cin, b'fileno'):
343 ui.setconfig(b'ui', b'nontty', b'true', b'commandserver')
337 ui.setconfig(b'ui', b'nontty', b'true', b'commandserver')
344
338
345 req = dispatch.request(
339 req = dispatch.request(
346 args[:],
340 args[:],
347 copiedui,
341 copiedui,
348 self.repo,
342 self.repo,
349 self.cin,
343 self.cin,
350 self.cout,
344 self.cout,
351 self.cerr,
345 self.cerr,
352 self.cmsg,
346 self.cmsg,
353 prereposetups=self._prereposetups,
347 prereposetups=self._prereposetups,
354 )
348 )
355
349
356 try:
350 try:
357 ret = self._dispatchcommand(req) & 255
351 ret = self._dispatchcommand(req) & 255
358 # If shutdown-on-interrupt is off, it's important to write the
352 # If shutdown-on-interrupt is off, it's important to write the
359 # result code *after* SIGINT handler removed. If the result code
353 # result code *after* SIGINT handler removed. If the result code
360 # were lost, the client wouldn't be able to continue processing.
354 # were lost, the client wouldn't be able to continue processing.
361 self.cresult.write(struct.pack(b'>i', int(ret)))
355 self.cresult.write(struct.pack(b'>i', int(ret)))
362 finally:
356 finally:
363 # restore old cwd
357 # restore old cwd
364 if b'--cwd' in args:
358 if b'--cwd' in args:
365 os.chdir(self.cwd)
359 os.chdir(self.cwd)
366
360
367 def getencoding(self):
361 def getencoding(self):
368 """writes the current encoding to the result channel"""
362 """writes the current encoding to the result channel"""
369 self.cresult.write(encoding.encoding)
363 self.cresult.write(encoding.encoding)
370
364
371 def serveone(self):
365 def serveone(self):
372 cmd = self.client.readline()[:-1]
366 cmd = self.client.readline()[:-1]
373 if cmd:
367 if cmd:
374 handler = self.capabilities.get(cmd)
368 handler = self.capabilities.get(cmd)
375 if handler:
369 if handler:
376 handler(self)
370 handler(self)
377 else:
371 else:
378 # clients are expected to check what commands are supported by
372 # clients are expected to check what commands are supported by
379 # looking at the servers capabilities
373 # looking at the servers capabilities
380 raise error.Abort(_(b'unknown command %s') % cmd)
374 raise error.Abort(_(b'unknown command %s') % cmd)
381
375
382 return cmd != b''
376 return cmd != b''
383
377
384 capabilities = {b'runcommand': runcommand, b'getencoding': getencoding}
378 capabilities = {b'runcommand': runcommand, b'getencoding': getencoding}
385
379
386 def serve(self):
380 def serve(self):
387 hellomsg = b'capabilities: ' + b' '.join(sorted(self.capabilities))
381 hellomsg = b'capabilities: ' + b' '.join(sorted(self.capabilities))
388 hellomsg += b'\n'
382 hellomsg += b'\n'
389 hellomsg += b'encoding: ' + encoding.encoding
383 hellomsg += b'encoding: ' + encoding.encoding
390 hellomsg += b'\n'
384 hellomsg += b'\n'
391 if self.cmsg:
385 if self.cmsg:
392 hellomsg += b'message-encoding: %s\n' % self.cmsg.encoding
386 hellomsg += b'message-encoding: %s\n' % self.cmsg.encoding
393 hellomsg += b'pid: %d' % procutil.getpid()
387 hellomsg += b'pid: %d' % procutil.getpid()
394 if util.safehasattr(os, b'getpgid'):
388 if util.safehasattr(os, b'getpgid'):
395 hellomsg += b'\n'
389 hellomsg += b'\n'
396 hellomsg += b'pgid: %d' % os.getpgid(0)
390 hellomsg += b'pgid: %d' % os.getpgid(0)
397
391
398 # write the hello msg in -one- chunk
392 # write the hello msg in -one- chunk
399 self.cout.write(hellomsg)
393 self.cout.write(hellomsg)
400
394
401 try:
395 try:
402 while self.serveone():
396 while self.serveone():
403 pass
397 pass
404 except EOFError:
398 except EOFError:
405 # we'll get here if the client disconnected while we were reading
399 # we'll get here if the client disconnected while we were reading
406 # its request
400 # its request
407 return 1
401 return 1
408
402
409 return 0
403 return 0
410
404
411
405
412 def setuplogging(ui, repo=None, fp=None):
406 def setuplogging(ui, repo=None, fp=None):
413 """Set up server logging facility
407 """Set up server logging facility
414
408
415 If cmdserver.log is '-', log messages will be sent to the given fp.
409 If cmdserver.log is '-', log messages will be sent to the given fp.
416 It should be the 'd' channel while a client is connected, and otherwise
410 It should be the 'd' channel while a client is connected, and otherwise
417 is the stderr of the server process.
411 is the stderr of the server process.
418 """
412 """
419 # developer config: cmdserver.log
413 # developer config: cmdserver.log
420 logpath = ui.config(b'cmdserver', b'log')
414 logpath = ui.config(b'cmdserver', b'log')
421 if not logpath:
415 if not logpath:
422 return
416 return
423 # developer config: cmdserver.track-log
417 # developer config: cmdserver.track-log
424 tracked = set(ui.configlist(b'cmdserver', b'track-log'))
418 tracked = set(ui.configlist(b'cmdserver', b'track-log'))
425
419
426 if logpath == b'-' and fp:
420 if logpath == b'-' and fp:
427 logger = loggingutil.fileobjectlogger(fp, tracked)
421 logger = loggingutil.fileobjectlogger(fp, tracked)
428 elif logpath == b'-':
422 elif logpath == b'-':
429 logger = loggingutil.fileobjectlogger(ui.ferr, tracked)
423 logger = loggingutil.fileobjectlogger(ui.ferr, tracked)
430 else:
424 else:
431 logpath = util.abspath(util.expandpath(logpath))
425 logpath = util.abspath(util.expandpath(logpath))
432 # developer config: cmdserver.max-log-files
426 # developer config: cmdserver.max-log-files
433 maxfiles = ui.configint(b'cmdserver', b'max-log-files')
427 maxfiles = ui.configint(b'cmdserver', b'max-log-files')
434 # developer config: cmdserver.max-log-size
428 # developer config: cmdserver.max-log-size
435 maxsize = ui.configbytes(b'cmdserver', b'max-log-size')
429 maxsize = ui.configbytes(b'cmdserver', b'max-log-size')
436 vfs = vfsmod.vfs(os.path.dirname(logpath))
430 vfs = vfsmod.vfs(os.path.dirname(logpath))
437 logger = loggingutil.filelogger(
431 logger = loggingutil.filelogger(
438 vfs,
432 vfs,
439 os.path.basename(logpath),
433 os.path.basename(logpath),
440 tracked,
434 tracked,
441 maxfiles=maxfiles,
435 maxfiles=maxfiles,
442 maxsize=maxsize,
436 maxsize=maxsize,
443 )
437 )
444
438
445 targetuis = {ui}
439 targetuis = {ui}
446 if repo:
440 if repo:
447 targetuis.add(repo.baseui)
441 targetuis.add(repo.baseui)
448 targetuis.add(repo.ui)
442 targetuis.add(repo.ui)
449 for u in targetuis:
443 for u in targetuis:
450 u.setlogger(b'cmdserver', logger)
444 u.setlogger(b'cmdserver', logger)
451
445
452
446
453 class pipeservice:
447 class pipeservice:
454 def __init__(self, ui, repo, opts):
448 def __init__(self, ui, repo, opts):
455 self.ui = ui
449 self.ui = ui
456 self.repo = repo
450 self.repo = repo
457
451
458 def init(self):
452 def init(self):
459 pass
453 pass
460
454
461 def run(self):
455 def run(self):
462 ui = self.ui
456 ui = self.ui
463 # redirect stdio to null device so that broken extensions or in-process
457 # redirect stdio to null device so that broken extensions or in-process
464 # hooks will never cause corruption of channel protocol.
458 # hooks will never cause corruption of channel protocol.
465 with ui.protectedfinout() as (fin, fout):
459 with ui.protectedfinout() as (fin, fout):
466 sv = server(ui, self.repo, fin, fout)
460 sv = server(ui, self.repo, fin, fout)
467 try:
461 try:
468 return sv.serve()
462 return sv.serve()
469 finally:
463 finally:
470 sv.cleanup()
464 sv.cleanup()
471
465
472
466
473 def _initworkerprocess():
467 def _initworkerprocess():
474 # use a different process group from the master process, in order to:
468 # use a different process group from the master process, in order to:
475 # 1. make the current process group no longer "orphaned" (because the
469 # 1. make the current process group no longer "orphaned" (because the
476 # parent of this process is in a different process group while
470 # parent of this process is in a different process group while
477 # remains in a same session)
471 # remains in a same session)
478 # according to POSIX 2.2.2.52, orphaned process group will ignore
472 # according to POSIX 2.2.2.52, orphaned process group will ignore
479 # terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
473 # terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
480 # cause trouble for things like ncurses.
474 # cause trouble for things like ncurses.
481 # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
475 # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
482 # SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
476 # SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
483 # processes like ssh will be killed properly, without affecting
477 # processes like ssh will be killed properly, without affecting
484 # unrelated processes.
478 # unrelated processes.
485 os.setpgid(0, 0)
479 os.setpgid(0, 0)
486 # change random state otherwise forked request handlers would have a
480 # change random state otherwise forked request handlers would have a
487 # same state inherited from parent.
481 # same state inherited from parent.
488 random.seed()
482 random.seed()
489
483
490
484
491 def _serverequest(ui, repo, conn, createcmdserver, prereposetups):
485 def _serverequest(ui, repo, conn, createcmdserver, prereposetups):
492 fin = conn.makefile('rb')
486 fin = conn.makefile('rb')
493 fout = conn.makefile('wb')
487 fout = conn.makefile('wb')
494 sv = None
488 sv = None
495 try:
489 try:
496 sv = createcmdserver(repo, conn, fin, fout, prereposetups)
490 sv = createcmdserver(repo, conn, fin, fout, prereposetups)
497 try:
491 try:
498 sv.serve()
492 sv.serve()
499 # handle exceptions that may be raised by command server. most of
493 # handle exceptions that may be raised by command server. most of
500 # known exceptions are caught by dispatch.
494 # known exceptions are caught by dispatch.
501 except error.Abort as inst:
495 except error.Abort as inst:
502 ui.error(_(b'abort: %s\n') % inst.message)
496 ui.error(_(b'abort: %s\n') % inst.message)
503 except IOError as inst:
497 except IOError as inst:
504 if inst.errno != errno.EPIPE:
498 if inst.errno != errno.EPIPE:
505 raise
499 raise
506 except KeyboardInterrupt:
500 except KeyboardInterrupt:
507 pass
501 pass
508 finally:
502 finally:
509 sv.cleanup()
503 sv.cleanup()
510 except: # re-raises
504 except: # re-raises
511 # also write traceback to error channel. otherwise client cannot
505 # also write traceback to error channel. otherwise client cannot
512 # see it because it is written to server's stderr by default.
506 # see it because it is written to server's stderr by default.
513 if sv:
507 if sv:
514 cerr = sv.cerr
508 cerr = sv.cerr
515 else:
509 else:
516 cerr = channeledoutput(fout, b'e')
510 cerr = channeledoutput(fout, b'e')
517 cerr.write(encoding.strtolocal(traceback.format_exc()))
511 cerr.write(encoding.strtolocal(traceback.format_exc()))
518 raise
512 raise
519 finally:
513 finally:
520 fin.close()
514 fin.close()
521 try:
515 try:
522 fout.close() # implicit flush() may cause another EPIPE
516 fout.close() # implicit flush() may cause another EPIPE
523 except IOError as inst:
517 except IOError as inst:
524 if inst.errno != errno.EPIPE:
518 if inst.errno != errno.EPIPE:
525 raise
519 raise
526
520
527
521
528 class unixservicehandler:
522 class unixservicehandler:
529 """Set of pluggable operations for unix-mode services
523 """Set of pluggable operations for unix-mode services
530
524
531 Almost all methods except for createcmdserver() are called in the main
525 Almost all methods except for createcmdserver() are called in the main
532 process. You can't pass mutable resource back from createcmdserver().
526 process. You can't pass mutable resource back from createcmdserver().
533 """
527 """
534
528
535 pollinterval = None
529 pollinterval = None
536
530
537 def __init__(self, ui):
531 def __init__(self, ui):
538 self.ui = ui
532 self.ui = ui
539
533
540 def bindsocket(self, sock, address):
534 def bindsocket(self, sock, address):
541 util.bindunixsocket(sock, address)
535 util.bindunixsocket(sock, address)
542 sock.listen(socket.SOMAXCONN)
536 sock.listen(socket.SOMAXCONN)
543 self.ui.status(_(b'listening at %s\n') % address)
537 self.ui.status(_(b'listening at %s\n') % address)
544 self.ui.flush() # avoid buffering of status message
538 self.ui.flush() # avoid buffering of status message
545
539
546 def unlinksocket(self, address):
540 def unlinksocket(self, address):
547 os.unlink(address)
541 os.unlink(address)
548
542
549 def shouldexit(self):
543 def shouldexit(self):
550 """True if server should shut down; checked per pollinterval"""
544 """True if server should shut down; checked per pollinterval"""
551 return False
545 return False
552
546
553 def newconnection(self):
547 def newconnection(self):
554 """Called when main process notices new connection"""
548 """Called when main process notices new connection"""
555
549
556 def createcmdserver(self, repo, conn, fin, fout, prereposetups):
550 def createcmdserver(self, repo, conn, fin, fout, prereposetups):
557 """Create new command server instance; called in the process that
551 """Create new command server instance; called in the process that
558 serves for the current connection"""
552 serves for the current connection"""
559 return server(self.ui, repo, fin, fout, prereposetups)
553 return server(self.ui, repo, fin, fout, prereposetups)
560
554
561
555
562 class unixforkingservice:
556 class unixforkingservice:
563 """
557 """
564 Listens on unix domain socket and forks server per connection
558 Listens on unix domain socket and forks server per connection
565 """
559 """
566
560
567 def __init__(self, ui, repo, opts, handler=None):
561 def __init__(self, ui, repo, opts, handler=None):
568 self.ui = ui
562 self.ui = ui
569 self.repo = repo
563 self.repo = repo
570 self.address = opts[b'address']
564 self.address = opts[b'address']
571 if not util.safehasattr(socket, b'AF_UNIX'):
565 if not util.safehasattr(socket, b'AF_UNIX'):
572 raise error.Abort(_(b'unsupported platform'))
566 raise error.Abort(_(b'unsupported platform'))
573 if not self.address:
567 if not self.address:
574 raise error.Abort(_(b'no socket path specified with --address'))
568 raise error.Abort(_(b'no socket path specified with --address'))
575 self._servicehandler = handler or unixservicehandler(ui)
569 self._servicehandler = handler or unixservicehandler(ui)
576 self._sock = None
570 self._sock = None
577 self._mainipc = None
571 self._mainipc = None
578 self._workeripc = None
572 self._workeripc = None
579 self._oldsigchldhandler = None
573 self._oldsigchldhandler = None
580 self._workerpids = set() # updated by signal handler; do not iterate
574 self._workerpids = set() # updated by signal handler; do not iterate
581 self._socketunlinked = None
575 self._socketunlinked = None
582 # experimental config: cmdserver.max-repo-cache
576 # experimental config: cmdserver.max-repo-cache
583 maxlen = ui.configint(b'cmdserver', b'max-repo-cache')
577 maxlen = ui.configint(b'cmdserver', b'max-repo-cache')
584 if maxlen < 0:
578 if maxlen < 0:
585 raise error.Abort(_(b'negative max-repo-cache size not allowed'))
579 raise error.Abort(_(b'negative max-repo-cache size not allowed'))
586 self._repoloader = repocache.repoloader(ui, maxlen)
580 self._repoloader = repocache.repoloader(ui, maxlen)
587 # attempt to avoid crash in CoreFoundation when using chg after fix in
581 # attempt to avoid crash in CoreFoundation when using chg after fix in
588 # a89381e04c58
582 # a89381e04c58
589 if pycompat.isdarwin:
583 if pycompat.isdarwin:
590 procutil.gui()
584 procutil.gui()
591
585
592 def init(self):
586 def init(self):
593 self._sock = socket.socket(socket.AF_UNIX)
587 self._sock = socket.socket(socket.AF_UNIX)
594 # IPC channel from many workers to one main process; this is actually
588 # IPC channel from many workers to one main process; this is actually
595 # a uni-directional pipe, but is backed by a DGRAM socket so each
589 # a uni-directional pipe, but is backed by a DGRAM socket so each
596 # message can be easily separated.
590 # message can be easily separated.
597 o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
591 o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
598 self._mainipc, self._workeripc = o
592 self._mainipc, self._workeripc = o
599 self._servicehandler.bindsocket(self._sock, self.address)
593 self._servicehandler.bindsocket(self._sock, self.address)
600 if util.safehasattr(procutil, b'unblocksignal'):
594 if util.safehasattr(procutil, b'unblocksignal'):
601 procutil.unblocksignal(signal.SIGCHLD)
595 procutil.unblocksignal(signal.SIGCHLD)
602 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
596 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
603 self._oldsigchldhandler = o
597 self._oldsigchldhandler = o
604 self._socketunlinked = False
598 self._socketunlinked = False
605 self._repoloader.start()
599 self._repoloader.start()
606
600
607 def _unlinksocket(self):
601 def _unlinksocket(self):
608 if not self._socketunlinked:
602 if not self._socketunlinked:
609 self._servicehandler.unlinksocket(self.address)
603 self._servicehandler.unlinksocket(self.address)
610 self._socketunlinked = True
604 self._socketunlinked = True
611
605
612 def _cleanup(self):
606 def _cleanup(self):
613 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
607 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
614 self._sock.close()
608 self._sock.close()
615 self._mainipc.close()
609 self._mainipc.close()
616 self._workeripc.close()
610 self._workeripc.close()
617 self._unlinksocket()
611 self._unlinksocket()
618 self._repoloader.stop()
612 self._repoloader.stop()
619 # don't kill child processes as they have active clients, just wait
613 # don't kill child processes as they have active clients, just wait
620 self._reapworkers(0)
614 self._reapworkers(0)
621
615
622 def run(self):
616 def run(self):
623 try:
617 try:
624 self._mainloop()
618 self._mainloop()
625 finally:
619 finally:
626 self._cleanup()
620 self._cleanup()
627
621
628 def _mainloop(self):
622 def _mainloop(self):
629 exiting = False
623 exiting = False
630 h = self._servicehandler
624 h = self._servicehandler
631 selector = selectors.DefaultSelector()
625 selector = selectors.DefaultSelector()
632 selector.register(
626 selector.register(
633 self._sock, selectors.EVENT_READ, self._acceptnewconnection
627 self._sock, selectors.EVENT_READ, self._acceptnewconnection
634 )
628 )
635 selector.register(
629 selector.register(
636 self._mainipc, selectors.EVENT_READ, self._handlemainipc
630 self._mainipc, selectors.EVENT_READ, self._handlemainipc
637 )
631 )
638 while True:
632 while True:
639 if not exiting and h.shouldexit():
633 if not exiting and h.shouldexit():
640 # clients can no longer connect() to the domain socket, so
634 # clients can no longer connect() to the domain socket, so
641 # we stop queuing new requests.
635 # we stop queuing new requests.
642 # for requests that are queued (connect()-ed, but haven't been
636 # for requests that are queued (connect()-ed, but haven't been
643 # accept()-ed), handle them before exit. otherwise, clients
637 # accept()-ed), handle them before exit. otherwise, clients
644 # waiting for recv() will receive ECONNRESET.
638 # waiting for recv() will receive ECONNRESET.
645 self._unlinksocket()
639 self._unlinksocket()
646 exiting = True
640 exiting = True
647 try:
648 events = selector.select(timeout=h.pollinterval)
641 events = selector.select(timeout=h.pollinterval)
649 except OSError as inst:
650 # selectors2 raises ETIMEDOUT if timeout exceeded while
651 # handling signal interrupt. That's probably wrong, but
652 # we can easily get around it.
653 if inst.errno != errno.ETIMEDOUT:
654 raise
655 events = []
656 if not events:
642 if not events:
657 # only exit if we completed all queued requests
643 # only exit if we completed all queued requests
658 if exiting:
644 if exiting:
659 break
645 break
660 continue
646 continue
661 for key, _mask in events:
647 for key, _mask in events:
662 key.data(key.fileobj, selector)
648 key.data(key.fileobj, selector)
663 selector.close()
649 selector.close()
664
650
665 def _acceptnewconnection(self, sock, selector):
651 def _acceptnewconnection(self, sock, selector):
666 h = self._servicehandler
652 h = self._servicehandler
667 try:
653 try:
668 conn, _addr = sock.accept()
654 conn, _addr = sock.accept()
669 except socket.error as inst:
655 except socket.error as inst:
670 if inst.args[0] == errno.EINTR:
656 if inst.args[0] == errno.EINTR:
671 return
657 return
672 raise
658 raise
673
659
674 # Future improvement: On Python 3.7, maybe gc.freeze() can be used
660 # Future improvement: On Python 3.7, maybe gc.freeze() can be used
675 # to prevent COW memory from being touched by GC.
661 # to prevent COW memory from being touched by GC.
676 # https://instagram-engineering.com/
662 # https://instagram-engineering.com/
677 # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
663 # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
678 pid = os.fork()
664 pid = os.fork()
679 if pid:
665 if pid:
680 try:
666 try:
681 self.ui.log(
667 self.ui.log(
682 b'cmdserver', b'forked worker process (pid=%d)\n', pid
668 b'cmdserver', b'forked worker process (pid=%d)\n', pid
683 )
669 )
684 self._workerpids.add(pid)
670 self._workerpids.add(pid)
685 h.newconnection()
671 h.newconnection()
686 finally:
672 finally:
687 conn.close() # release handle in parent process
673 conn.close() # release handle in parent process
688 else:
674 else:
689 try:
675 try:
690 selector.close()
676 selector.close()
691 sock.close()
677 sock.close()
692 self._mainipc.close()
678 self._mainipc.close()
693 self._runworker(conn)
679 self._runworker(conn)
694 conn.close()
680 conn.close()
695 self._workeripc.close()
681 self._workeripc.close()
696 os._exit(0)
682 os._exit(0)
697 except: # never return, hence no re-raises
683 except: # never return, hence no re-raises
698 try:
684 try:
699 self.ui.traceback(force=True)
685 self.ui.traceback(force=True)
700 finally:
686 finally:
701 os._exit(255)
687 os._exit(255)
702
688
703 def _handlemainipc(self, sock, selector):
689 def _handlemainipc(self, sock, selector):
704 """Process messages sent from a worker"""
690 """Process messages sent from a worker"""
705 try:
691 try:
706 path = sock.recv(32768) # large enough to receive path
692 path = sock.recv(32768) # large enough to receive path
707 except socket.error as inst:
693 except socket.error as inst:
708 if inst.args[0] == errno.EINTR:
694 if inst.args[0] == errno.EINTR:
709 return
695 return
710 raise
696 raise
711 self._repoloader.load(path)
697 self._repoloader.load(path)
712
698
713 def _sigchldhandler(self, signal, frame):
699 def _sigchldhandler(self, signal, frame):
714 self._reapworkers(os.WNOHANG)
700 self._reapworkers(os.WNOHANG)
715
701
716 def _reapworkers(self, options):
702 def _reapworkers(self, options):
717 while self._workerpids:
703 while self._workerpids:
718 try:
704 try:
719 pid, _status = os.waitpid(-1, options)
705 pid, _status = os.waitpid(-1, options)
720 except OSError as inst:
706 except OSError as inst:
721 if inst.errno == errno.EINTR:
707 if inst.errno == errno.EINTR:
722 continue
708 continue
723 if inst.errno != errno.ECHILD:
709 if inst.errno != errno.ECHILD:
724 raise
710 raise
725 # no child processes at all (reaped by other waitpid()?)
711 # no child processes at all (reaped by other waitpid()?)
726 self._workerpids.clear()
712 self._workerpids.clear()
727 return
713 return
728 if pid == 0:
714 if pid == 0:
729 # no waitable child processes
715 # no waitable child processes
730 return
716 return
731 self.ui.log(b'cmdserver', b'worker process exited (pid=%d)\n', pid)
717 self.ui.log(b'cmdserver', b'worker process exited (pid=%d)\n', pid)
732 self._workerpids.discard(pid)
718 self._workerpids.discard(pid)
733
719
734 def _runworker(self, conn):
720 def _runworker(self, conn):
735 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
721 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
736 _initworkerprocess()
722 _initworkerprocess()
737 h = self._servicehandler
723 h = self._servicehandler
738 try:
724 try:
739 _serverequest(
725 _serverequest(
740 self.ui,
726 self.ui,
741 self.repo,
727 self.repo,
742 conn,
728 conn,
743 h.createcmdserver,
729 h.createcmdserver,
744 prereposetups=[self._reposetup],
730 prereposetups=[self._reposetup],
745 )
731 )
746 finally:
732 finally:
747 gc.collect() # trigger __del__ since worker process uses os._exit
733 gc.collect() # trigger __del__ since worker process uses os._exit
748
734
749 def _reposetup(self, ui, repo):
735 def _reposetup(self, ui, repo):
750 if not repo.local():
736 if not repo.local():
751 return
737 return
752
738
753 class unixcmdserverrepo(repo.__class__):
739 class unixcmdserverrepo(repo.__class__):
754 def close(self):
740 def close(self):
755 super(unixcmdserverrepo, self).close()
741 super(unixcmdserverrepo, self).close()
756 try:
742 try:
757 self._cmdserveripc.send(self.root)
743 self._cmdserveripc.send(self.root)
758 except socket.error:
744 except socket.error:
759 self.ui.log(
745 self.ui.log(
760 b'cmdserver', b'failed to send repo root to master\n'
746 b'cmdserver', b'failed to send repo root to master\n'
761 )
747 )
762
748
763 repo.__class__ = unixcmdserverrepo
749 repo.__class__ = unixcmdserverrepo
764 repo._cmdserveripc = self._workeripc
750 repo._cmdserveripc = self._workeripc
765
751
766 cachedrepo = self._repoloader.get(repo.root)
752 cachedrepo = self._repoloader.get(repo.root)
767 if cachedrepo is None:
753 if cachedrepo is None:
768 return
754 return
769 repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root)
755 repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root)
770 repocache.copycache(cachedrepo, repo)
756 repocache.copycache(cachedrepo, repo)
@@ -1,473 +1,469 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8
8
9 import errno
9 import errno
10 import os
10 import os
11 import pickle
11 import pickle
12 import selectors
12 import signal
13 import signal
13 import sys
14 import sys
14 import threading
15 import threading
15 import time
16 import time
16
17
17 try:
18 import selectors
19
20 selectors.BaseSelector
21 except ImportError:
22 from .thirdparty import selectors2 as selectors
23
24 from .i18n import _
18 from .i18n import _
25 from . import (
19 from . import (
26 encoding,
20 encoding,
27 error,
21 error,
28 pycompat,
22 pycompat,
29 scmutil,
23 scmutil,
30 )
24 )
31
25
32
26
33 def countcpus():
27 def countcpus():
34 '''try to count the number of CPUs on the system'''
28 '''try to count the number of CPUs on the system'''
35
29
36 # posix
30 # posix
37 try:
31 try:
38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
32 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 if n > 0:
33 if n > 0:
40 return n
34 return n
41 except (AttributeError, ValueError):
35 except (AttributeError, ValueError):
42 pass
36 pass
43
37
44 # windows
38 # windows
45 try:
39 try:
46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
40 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 if n > 0:
41 if n > 0:
48 return n
42 return n
49 except (KeyError, ValueError):
43 except (KeyError, ValueError):
50 pass
44 pass
51
45
52 return 1
46 return 1
53
47
54
48
55 def _numworkers(ui):
49 def _numworkers(ui):
56 s = ui.config(b'worker', b'numcpus')
50 s = ui.config(b'worker', b'numcpus')
57 if s:
51 if s:
58 try:
52 try:
59 n = int(s)
53 n = int(s)
60 if n >= 1:
54 if n >= 1:
61 return n
55 return n
62 except ValueError:
56 except ValueError:
63 raise error.Abort(_(b'number of cpus must be an integer'))
57 raise error.Abort(_(b'number of cpus must be an integer'))
64 return min(max(countcpus(), 4), 32)
58 return min(max(countcpus(), 4), 32)
65
59
66
60
67 def ismainthread():
61 def ismainthread():
68 return threading.current_thread() == threading.main_thread()
62 return threading.current_thread() == threading.main_thread()
69
63
70
64
71 class _blockingreader:
65 class _blockingreader:
72 """Wrap unbuffered stream such that pickle.load() works with it.
66 """Wrap unbuffered stream such that pickle.load() works with it.
73
67
74 pickle.load() expects that calls to read() and readinto() read as many
68 pickle.load() expects that calls to read() and readinto() read as many
75 bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
69 bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
76 pickle.load() raises an EOFError.
70 pickle.load() raises an EOFError.
77 """
71 """
78
72
79 def __init__(self, wrapped):
73 def __init__(self, wrapped):
80 self._wrapped = wrapped
74 self._wrapped = wrapped
81
75
82 def readline(self):
76 def readline(self):
83 return self._wrapped.readline()
77 return self._wrapped.readline()
84
78
85 def readinto(self, buf):
79 def readinto(self, buf):
86 pos = 0
80 pos = 0
87 size = len(buf)
81 size = len(buf)
88
82
89 with memoryview(buf) as view:
83 with memoryview(buf) as view:
90 while pos < size:
84 while pos < size:
91 with view[pos:] as subview:
85 with view[pos:] as subview:
92 ret = self._wrapped.readinto(subview)
86 ret = self._wrapped.readinto(subview)
93 if not ret:
87 if not ret:
94 break
88 break
95 pos += ret
89 pos += ret
96
90
97 return pos
91 return pos
98
92
99 # issue multiple reads until size is fulfilled (or EOF is encountered)
93 # issue multiple reads until size is fulfilled (or EOF is encountered)
100 def read(self, size=-1):
94 def read(self, size=-1):
101 if size < 0:
95 if size < 0:
102 return self._wrapped.readall()
96 return self._wrapped.readall()
103
97
104 buf = bytearray(size)
98 buf = bytearray(size)
105 n_read = self.readinto(buf)
99 n_read = self.readinto(buf)
106 del buf[n_read:]
100 del buf[n_read:]
107 return bytes(buf)
101 return bytes(buf)
108
102
109
103
110 if pycompat.isposix or pycompat.iswindows:
104 if pycompat.isposix or pycompat.iswindows:
111 _STARTUP_COST = 0.01
105 _STARTUP_COST = 0.01
112 # The Windows worker is thread based. If tasks are CPU bound, threads
106 # The Windows worker is thread based. If tasks are CPU bound, threads
113 # in the presence of the GIL result in excessive context switching and
107 # in the presence of the GIL result in excessive context switching and
114 # this overhead can slow down execution.
108 # this overhead can slow down execution.
115 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
109 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
116 else:
110 else:
117 _STARTUP_COST = 1e30
111 _STARTUP_COST = 1e30
118 _DISALLOW_THREAD_UNSAFE = False
112 _DISALLOW_THREAD_UNSAFE = False
119
113
120
114
121 def worthwhile(ui, costperop, nops, threadsafe=True):
115 def worthwhile(ui, costperop, nops, threadsafe=True):
122 """try to determine whether the benefit of multiple processes can
116 """try to determine whether the benefit of multiple processes can
123 outweigh the cost of starting them"""
117 outweigh the cost of starting them"""
124
118
125 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
119 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
126 return False
120 return False
127
121
128 linear = costperop * nops
122 linear = costperop * nops
129 workers = _numworkers(ui)
123 workers = _numworkers(ui)
130 benefit = linear - (_STARTUP_COST * workers + linear / workers)
124 benefit = linear - (_STARTUP_COST * workers + linear / workers)
131 return benefit >= 0.15
125 return benefit >= 0.15
132
126
133
127
134 def worker(
128 def worker(
135 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
129 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
136 ):
130 ):
137 """run a function, possibly in parallel in multiple worker
131 """run a function, possibly in parallel in multiple worker
138 processes.
132 processes.
139
133
140 returns a progress iterator
134 returns a progress iterator
141
135
142 costperarg - cost of a single task
136 costperarg - cost of a single task
143
137
144 func - function to run. It is expected to return a progress iterator.
138 func - function to run. It is expected to return a progress iterator.
145
139
146 staticargs - arguments to pass to every invocation of the function
140 staticargs - arguments to pass to every invocation of the function
147
141
148 args - arguments to split into chunks, to pass to individual
142 args - arguments to split into chunks, to pass to individual
149 workers
143 workers
150
144
151 hasretval - when True, func and the current function return an progress
145 hasretval - when True, func and the current function return an progress
152 iterator then a dict (encoded as an iterator that yield many (False, ..)
146 iterator then a dict (encoded as an iterator that yield many (False, ..)
153 then a (True, dict)). The dicts are joined in some arbitrary order, so
147 then a (True, dict)). The dicts are joined in some arbitrary order, so
154 overlapping keys are a bad idea.
148 overlapping keys are a bad idea.
155
149
156 threadsafe - whether work items are thread safe and can be executed using
150 threadsafe - whether work items are thread safe and can be executed using
157 a thread-based worker. Should be disabled for CPU heavy tasks that don't
151 a thread-based worker. Should be disabled for CPU heavy tasks that don't
158 release the GIL.
152 release the GIL.
159 """
153 """
160 enabled = ui.configbool(b'worker', b'enabled')
154 enabled = ui.configbool(b'worker', b'enabled')
161 if enabled and _platformworker is _posixworker and not ismainthread():
155 if enabled and _platformworker is _posixworker and not ismainthread():
162 # The POSIX worker has to install a handler for SIGCHLD.
156 # The POSIX worker has to install a handler for SIGCHLD.
163 # Python up to 3.9 only allows this in the main thread.
157 # Python up to 3.9 only allows this in the main thread.
164 enabled = False
158 enabled = False
165
159
166 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
160 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
167 return _platformworker(ui, func, staticargs, args, hasretval)
161 return _platformworker(ui, func, staticargs, args, hasretval)
168 return func(*staticargs + (args,))
162 return func(*staticargs + (args,))
169
163
170
164
171 def _posixworker(ui, func, staticargs, args, hasretval):
165 def _posixworker(ui, func, staticargs, args, hasretval):
172 workers = _numworkers(ui)
166 workers = _numworkers(ui)
173 oldhandler = signal.getsignal(signal.SIGINT)
167 oldhandler = signal.getsignal(signal.SIGINT)
174 signal.signal(signal.SIGINT, signal.SIG_IGN)
168 signal.signal(signal.SIGINT, signal.SIG_IGN)
175 pids, problem = set(), [0]
169 pids, problem = set(), [0]
176
170
177 def killworkers():
171 def killworkers():
178 # unregister SIGCHLD handler as all children will be killed. This
172 # unregister SIGCHLD handler as all children will be killed. This
179 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
173 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
180 # could be updated while iterating, which would cause inconsistency.
174 # could be updated while iterating, which would cause inconsistency.
181 signal.signal(signal.SIGCHLD, oldchldhandler)
175 signal.signal(signal.SIGCHLD, oldchldhandler)
182 # if one worker bails, there's no good reason to wait for the rest
176 # if one worker bails, there's no good reason to wait for the rest
183 for p in pids:
177 for p in pids:
184 try:
178 try:
185 os.kill(p, signal.SIGTERM)
179 os.kill(p, signal.SIGTERM)
186 except OSError as err:
180 except OSError as err:
187 if err.errno != errno.ESRCH:
181 if err.errno != errno.ESRCH:
188 raise
182 raise
189
183
190 def waitforworkers(blocking=True):
184 def waitforworkers(blocking=True):
191 for pid in pids.copy():
185 for pid in pids.copy():
192 p = st = 0
186 p = st = 0
193 while True:
187 while True:
194 try:
188 try:
195 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
189 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
196 break
190 break
197 except OSError as e:
191 except OSError as e:
198 if e.errno == errno.EINTR:
192 if e.errno == errno.EINTR:
199 continue
193 continue
200 elif e.errno == errno.ECHILD:
194 elif e.errno == errno.ECHILD:
201 # child would already be reaped, but pids yet been
195 # child would already be reaped, but pids yet been
202 # updated (maybe interrupted just after waitpid)
196 # updated (maybe interrupted just after waitpid)
203 pids.discard(pid)
197 pids.discard(pid)
204 break
198 break
205 else:
199 else:
206 raise
200 raise
207 if not p:
201 if not p:
208 # skip subsequent steps, because child process should
202 # skip subsequent steps, because child process should
209 # be still running in this case
203 # be still running in this case
210 continue
204 continue
211 pids.discard(p)
205 pids.discard(p)
212 st = _exitstatus(st)
206 st = _exitstatus(st)
213 if st and not problem[0]:
207 if st and not problem[0]:
214 problem[0] = st
208 problem[0] = st
215
209
216 def sigchldhandler(signum, frame):
210 def sigchldhandler(signum, frame):
217 waitforworkers(blocking=False)
211 waitforworkers(blocking=False)
218 if problem[0]:
212 if problem[0]:
219 killworkers()
213 killworkers()
220
214
221 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
215 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
222 ui.flush()
216 ui.flush()
223 parentpid = os.getpid()
217 parentpid = os.getpid()
224 pipes = []
218 pipes = []
225 retval = {}
219 retval = {}
226 for pargs in partition(args, min(workers, len(args))):
220 for pargs in partition(args, min(workers, len(args))):
227 # Every worker gets its own pipe to send results on, so we don't have to
221 # Every worker gets its own pipe to send results on, so we don't have to
228 # implement atomic writes larger than PIPE_BUF. Each forked process has
222 # implement atomic writes larger than PIPE_BUF. Each forked process has
229 # its own pipe's descriptors in the local variables, and the parent
223 # its own pipe's descriptors in the local variables, and the parent
230 # process has the full list of pipe descriptors (and it doesn't really
224 # process has the full list of pipe descriptors (and it doesn't really
231 # care what order they're in).
225 # care what order they're in).
232 rfd, wfd = os.pipe()
226 rfd, wfd = os.pipe()
233 pipes.append((rfd, wfd))
227 pipes.append((rfd, wfd))
234 # make sure we use os._exit in all worker code paths. otherwise the
228 # make sure we use os._exit in all worker code paths. otherwise the
235 # worker may do some clean-ups which could cause surprises like
229 # worker may do some clean-ups which could cause surprises like
236 # deadlock. see sshpeer.cleanup for example.
230 # deadlock. see sshpeer.cleanup for example.
237 # override error handling *before* fork. this is necessary because
231 # override error handling *before* fork. this is necessary because
238 # exception (signal) may arrive after fork, before "pid =" assignment
232 # exception (signal) may arrive after fork, before "pid =" assignment
239 # completes, and other exception handler (dispatch.py) can lead to
233 # completes, and other exception handler (dispatch.py) can lead to
240 # unexpected code path without os._exit.
234 # unexpected code path without os._exit.
241 ret = -1
235 ret = -1
242 try:
236 try:
243 pid = os.fork()
237 pid = os.fork()
244 if pid == 0:
238 if pid == 0:
245 signal.signal(signal.SIGINT, oldhandler)
239 signal.signal(signal.SIGINT, oldhandler)
246 signal.signal(signal.SIGCHLD, oldchldhandler)
240 signal.signal(signal.SIGCHLD, oldchldhandler)
247
241
248 def workerfunc():
242 def workerfunc():
249 for r, w in pipes[:-1]:
243 for r, w in pipes[:-1]:
250 os.close(r)
244 os.close(r)
251 os.close(w)
245 os.close(w)
252 os.close(rfd)
246 os.close(rfd)
253 with os.fdopen(wfd, 'wb') as wf:
247 with os.fdopen(wfd, 'wb') as wf:
254 for result in func(*(staticargs + (pargs,))):
248 for result in func(*(staticargs + (pargs,))):
255 pickle.dump(result, wf)
249 pickle.dump(result, wf)
256 wf.flush()
250 wf.flush()
257 return 0
251 return 0
258
252
259 ret = scmutil.callcatch(ui, workerfunc)
253 ret = scmutil.callcatch(ui, workerfunc)
260 except: # parent re-raises, child never returns
254 except: # parent re-raises, child never returns
261 if os.getpid() == parentpid:
255 if os.getpid() == parentpid:
262 raise
256 raise
263 exctype = sys.exc_info()[0]
257 exctype = sys.exc_info()[0]
264 force = not issubclass(exctype, KeyboardInterrupt)
258 force = not issubclass(exctype, KeyboardInterrupt)
265 ui.traceback(force=force)
259 ui.traceback(force=force)
266 finally:
260 finally:
267 if os.getpid() != parentpid:
261 if os.getpid() != parentpid:
268 try:
262 try:
269 ui.flush()
263 ui.flush()
270 except: # never returns, no re-raises
264 except: # never returns, no re-raises
271 pass
265 pass
272 finally:
266 finally:
273 os._exit(ret & 255)
267 os._exit(ret & 255)
274 pids.add(pid)
268 pids.add(pid)
275 selector = selectors.DefaultSelector()
269 selector = selectors.DefaultSelector()
276 for rfd, wfd in pipes:
270 for rfd, wfd in pipes:
277 os.close(wfd)
271 os.close(wfd)
278 # The stream has to be unbuffered. Otherwise, if all data is read from
272 # The stream has to be unbuffered. Otherwise, if all data is read from
279 # the raw file into the buffer, the selector thinks that the FD is not
273 # the raw file into the buffer, the selector thinks that the FD is not
280 # ready to read while pickle.load() could read from the buffer. This
274 # ready to read while pickle.load() could read from the buffer. This
281 # would delay the processing of readable items.
275 # would delay the processing of readable items.
282 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
276 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
283
277
284 def cleanup():
278 def cleanup():
285 signal.signal(signal.SIGINT, oldhandler)
279 signal.signal(signal.SIGINT, oldhandler)
286 waitforworkers()
280 waitforworkers()
287 signal.signal(signal.SIGCHLD, oldchldhandler)
281 signal.signal(signal.SIGCHLD, oldchldhandler)
288 selector.close()
282 selector.close()
289 return problem[0]
283 return problem[0]
290
284
291 try:
285 try:
292 openpipes = len(pipes)
286 openpipes = len(pipes)
293 while openpipes > 0:
287 while openpipes > 0:
294 for key, events in selector.select():
288 for key, events in selector.select():
295 try:
289 try:
296 # The pytype error likely goes away on a modern version of
290 # The pytype error likely goes away on a modern version of
297 # pytype having a modern typeshed snapshot.
291 # pytype having a modern typeshed snapshot.
298 # pytype: disable=wrong-arg-types
292 # pytype: disable=wrong-arg-types
299 res = pickle.load(_blockingreader(key.fileobj))
293 res = pickle.load(_blockingreader(key.fileobj))
300 # pytype: enable=wrong-arg-types
294 # pytype: enable=wrong-arg-types
301 if hasretval and res[0]:
295 if hasretval and res[0]:
302 retval.update(res[1])
296 retval.update(res[1])
303 else:
297 else:
304 yield res
298 yield res
305 except EOFError:
299 except EOFError:
306 selector.unregister(key.fileobj)
300 selector.unregister(key.fileobj)
301 # pytype: disable=attribute-error
307 key.fileobj.close()
302 key.fileobj.close()
303 # pytype: enable=attribute-error
308 openpipes -= 1
304 openpipes -= 1
309 except IOError as e:
305 except IOError as e:
310 if e.errno == errno.EINTR:
306 if e.errno == errno.EINTR:
311 continue
307 continue
312 raise
308 raise
313 except: # re-raises
309 except: # re-raises
314 killworkers()
310 killworkers()
315 cleanup()
311 cleanup()
316 raise
312 raise
317 status = cleanup()
313 status = cleanup()
318 if status:
314 if status:
319 if status < 0:
315 if status < 0:
320 os.kill(os.getpid(), -status)
316 os.kill(os.getpid(), -status)
321 raise error.WorkerError(status)
317 raise error.WorkerError(status)
322 if hasretval:
318 if hasretval:
323 yield True, retval
319 yield True, retval
324
320
325
321
326 def _posixexitstatus(code):
322 def _posixexitstatus(code):
327 """convert a posix exit status into the same form returned by
323 """convert a posix exit status into the same form returned by
328 os.spawnv
324 os.spawnv
329
325
330 returns None if the process was stopped instead of exiting"""
326 returns None if the process was stopped instead of exiting"""
331 if os.WIFEXITED(code):
327 if os.WIFEXITED(code):
332 return os.WEXITSTATUS(code)
328 return os.WEXITSTATUS(code)
333 elif os.WIFSIGNALED(code):
329 elif os.WIFSIGNALED(code):
334 return -(os.WTERMSIG(code))
330 return -(os.WTERMSIG(code))
335
331
336
332
337 def _windowsworker(ui, func, staticargs, args, hasretval):
333 def _windowsworker(ui, func, staticargs, args, hasretval):
338 class Worker(threading.Thread):
334 class Worker(threading.Thread):
339 def __init__(
335 def __init__(
340 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
336 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
341 ):
337 ):
342 threading.Thread.__init__(self, *args, **kwargs)
338 threading.Thread.__init__(self, *args, **kwargs)
343 self._taskqueue = taskqueue
339 self._taskqueue = taskqueue
344 self._resultqueue = resultqueue
340 self._resultqueue = resultqueue
345 self._func = func
341 self._func = func
346 self._staticargs = staticargs
342 self._staticargs = staticargs
347 self._interrupted = False
343 self._interrupted = False
348 self.daemon = True
344 self.daemon = True
349 self.exception = None
345 self.exception = None
350
346
351 def interrupt(self):
347 def interrupt(self):
352 self._interrupted = True
348 self._interrupted = True
353
349
354 def run(self):
350 def run(self):
355 try:
351 try:
356 while not self._taskqueue.empty():
352 while not self._taskqueue.empty():
357 try:
353 try:
358 args = self._taskqueue.get_nowait()
354 args = self._taskqueue.get_nowait()
359 for res in self._func(*self._staticargs + (args,)):
355 for res in self._func(*self._staticargs + (args,)):
360 self._resultqueue.put(res)
356 self._resultqueue.put(res)
361 # threading doesn't provide a native way to
357 # threading doesn't provide a native way to
362 # interrupt execution. handle it manually at every
358 # interrupt execution. handle it manually at every
363 # iteration.
359 # iteration.
364 if self._interrupted:
360 if self._interrupted:
365 return
361 return
366 except pycompat.queue.Empty:
362 except pycompat.queue.Empty:
367 break
363 break
368 except Exception as e:
364 except Exception as e:
369 # store the exception such that the main thread can resurface
365 # store the exception such that the main thread can resurface
370 # it as if the func was running without workers.
366 # it as if the func was running without workers.
371 self.exception = e
367 self.exception = e
372 raise
368 raise
373
369
374 threads = []
370 threads = []
375
371
376 def trykillworkers():
372 def trykillworkers():
377 # Allow up to 1 second to clean worker threads nicely
373 # Allow up to 1 second to clean worker threads nicely
378 cleanupend = time.time() + 1
374 cleanupend = time.time() + 1
379 for t in threads:
375 for t in threads:
380 t.interrupt()
376 t.interrupt()
381 for t in threads:
377 for t in threads:
382 remainingtime = cleanupend - time.time()
378 remainingtime = cleanupend - time.time()
383 t.join(remainingtime)
379 t.join(remainingtime)
384 if t.is_alive():
380 if t.is_alive():
385 # pass over the workers joining failure. it is more
381 # pass over the workers joining failure. it is more
386 # important to surface the inital exception than the
382 # important to surface the inital exception than the
387 # fact that one of workers may be processing a large
383 # fact that one of workers may be processing a large
388 # task and does not get to handle the interruption.
384 # task and does not get to handle the interruption.
389 ui.warn(
385 ui.warn(
390 _(
386 _(
391 b"failed to kill worker threads while "
387 b"failed to kill worker threads while "
392 b"handling an exception\n"
388 b"handling an exception\n"
393 )
389 )
394 )
390 )
395 return
391 return
396
392
397 workers = _numworkers(ui)
393 workers = _numworkers(ui)
398 resultqueue = pycompat.queue.Queue()
394 resultqueue = pycompat.queue.Queue()
399 taskqueue = pycompat.queue.Queue()
395 taskqueue = pycompat.queue.Queue()
400 retval = {}
396 retval = {}
401 # partition work to more pieces than workers to minimize the chance
397 # partition work to more pieces than workers to minimize the chance
402 # of uneven distribution of large tasks between the workers
398 # of uneven distribution of large tasks between the workers
403 for pargs in partition(args, workers * 20):
399 for pargs in partition(args, workers * 20):
404 taskqueue.put(pargs)
400 taskqueue.put(pargs)
405 for _i in range(workers):
401 for _i in range(workers):
406 t = Worker(taskqueue, resultqueue, func, staticargs)
402 t = Worker(taskqueue, resultqueue, func, staticargs)
407 threads.append(t)
403 threads.append(t)
408 t.start()
404 t.start()
409 try:
405 try:
410 while len(threads) > 0:
406 while len(threads) > 0:
411 while not resultqueue.empty():
407 while not resultqueue.empty():
412 res = resultqueue.get()
408 res = resultqueue.get()
413 if hasretval and res[0]:
409 if hasretval and res[0]:
414 retval.update(res[1])
410 retval.update(res[1])
415 else:
411 else:
416 yield res
412 yield res
417 threads[0].join(0.05)
413 threads[0].join(0.05)
418 finishedthreads = [_t for _t in threads if not _t.is_alive()]
414 finishedthreads = [_t for _t in threads if not _t.is_alive()]
419 for t in finishedthreads:
415 for t in finishedthreads:
420 if t.exception is not None:
416 if t.exception is not None:
421 raise t.exception
417 raise t.exception
422 threads.remove(t)
418 threads.remove(t)
423 except (Exception, KeyboardInterrupt): # re-raises
419 except (Exception, KeyboardInterrupt): # re-raises
424 trykillworkers()
420 trykillworkers()
425 raise
421 raise
426 while not resultqueue.empty():
422 while not resultqueue.empty():
427 res = resultqueue.get()
423 res = resultqueue.get()
428 if hasretval and res[0]:
424 if hasretval and res[0]:
429 retval.update(res[1])
425 retval.update(res[1])
430 else:
426 else:
431 yield res
427 yield res
432 if hasretval:
428 if hasretval:
433 yield True, retval
429 yield True, retval
434
430
435
431
436 if pycompat.iswindows:
432 if pycompat.iswindows:
437 _platformworker = _windowsworker
433 _platformworker = _windowsworker
438 else:
434 else:
439 _platformworker = _posixworker
435 _platformworker = _posixworker
440 _exitstatus = _posixexitstatus
436 _exitstatus = _posixexitstatus
441
437
442
438
443 def partition(lst, nslices):
439 def partition(lst, nslices):
444 """partition a list into N slices of roughly equal size
440 """partition a list into N slices of roughly equal size
445
441
446 The current strategy takes every Nth element from the input. If
442 The current strategy takes every Nth element from the input. If
447 we ever write workers that need to preserve grouping in input
443 we ever write workers that need to preserve grouping in input
448 we should consider allowing callers to specify a partition strategy.
444 we should consider allowing callers to specify a partition strategy.
449
445
450 olivia is not a fan of this partitioning strategy when files are involved.
446 olivia is not a fan of this partitioning strategy when files are involved.
451 In his words:
447 In his words:
452
448
453 Single-threaded Mercurial makes a point of creating and visiting
449 Single-threaded Mercurial makes a point of creating and visiting
454 files in a fixed order (alphabetical). When creating files in order,
450 files in a fixed order (alphabetical). When creating files in order,
455 a typical filesystem is likely to allocate them on nearby regions on
451 a typical filesystem is likely to allocate them on nearby regions on
456 disk. Thus, when revisiting in the same order, locality is maximized
452 disk. Thus, when revisiting in the same order, locality is maximized
457 and various forms of OS and disk-level caching and read-ahead get a
453 and various forms of OS and disk-level caching and read-ahead get a
458 chance to work.
454 chance to work.
459
455
460 This effect can be quite significant on spinning disks. I discovered it
456 This effect can be quite significant on spinning disks. I discovered it
461 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
457 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
462 Tarring a repo and copying it to another disk effectively randomized
458 Tarring a repo and copying it to another disk effectively randomized
463 the revlog ordering on disk by sorting the revlogs by hash and suddenly
459 the revlog ordering on disk by sorting the revlogs by hash and suddenly
464 performance of my kernel checkout benchmark dropped by ~10x because the
460 performance of my kernel checkout benchmark dropped by ~10x because the
465 "working set" of sectors visited no longer fit in the drive's cache and
461 "working set" of sectors visited no longer fit in the drive's cache and
466 the workload switched from streaming to random I/O.
462 the workload switched from streaming to random I/O.
467
463
468 What we should really be doing is have workers read filenames from a
464 What we should really be doing is have workers read filenames from a
469 ordered queue. This preserves locality and also keeps any worker from
465 ordered queue. This preserves locality and also keeps any worker from
470 getting more than one file out of balance.
466 getting more than one file out of balance.
471 """
467 """
472 for i in range(nslices):
468 for i in range(nslices):
473 yield lst[i::nslices]
469 yield lst[i::nslices]
1 NO CONTENT: file was removed
NO CONTENT: file was removed
This diff has been collapsed as it changes many lines, (743 lines changed) Show them Hide them
General Comments 0
You need to be logged in to leave comments. Login now