##// END OF EJS Templates
workers: implemented worker on windows...
Wojciech Lis -
r35427:02b36e86 default
parent child Browse files
Show More
@@ -1,240 +1,285 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14
15
15 from .i18n import _
16 from .i18n import _
16 from . import (
17 from . import (
17 encoding,
18 encoding,
18 error,
19 error,
19 pycompat,
20 pycompat,
20 scmutil,
21 scmutil,
21 util,
22 util,
22 )
23 )
23
24
24 def countcpus():
25 def countcpus():
25 '''try to count the number of CPUs on the system'''
26 '''try to count the number of CPUs on the system'''
26
27
27 # posix
28 # posix
28 try:
29 try:
29 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
30 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
30 if n > 0:
31 if n > 0:
31 return n
32 return n
32 except (AttributeError, ValueError):
33 except (AttributeError, ValueError):
33 pass
34 pass
34
35
35 # windows
36 # windows
36 try:
37 try:
37 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
38 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
38 if n > 0:
39 if n > 0:
39 return n
40 return n
40 except (KeyError, ValueError):
41 except (KeyError, ValueError):
41 pass
42 pass
42
43
43 return 1
44 return 1
44
45
45 def _numworkers(ui):
46 def _numworkers(ui):
46 s = ui.config('worker', 'numcpus')
47 s = ui.config('worker', 'numcpus')
47 if s:
48 if s:
48 try:
49 try:
49 n = int(s)
50 n = int(s)
50 if n >= 1:
51 if n >= 1:
51 return n
52 return n
52 except ValueError:
53 except ValueError:
53 raise error.Abort(_('number of cpus must be an integer'))
54 raise error.Abort(_('number of cpus must be an integer'))
54 return min(max(countcpus(), 4), 32)
55 return min(max(countcpus(), 4), 32)
55
56
56 if pycompat.isposix:
57 if pycompat.isposix or pycompat.iswindows:
57 _startupcost = 0.01
58 _startupcost = 0.01
58 else:
59 else:
59 _startupcost = 1e30
60 _startupcost = 1e30
60
61
61 def worthwhile(ui, costperop, nops):
62 def worthwhile(ui, costperop, nops):
62 '''try to determine whether the benefit of multiple processes can
63 '''try to determine whether the benefit of multiple processes can
63 outweigh the cost of starting them'''
64 outweigh the cost of starting them'''
64 linear = costperop * nops
65 linear = costperop * nops
65 workers = _numworkers(ui)
66 workers = _numworkers(ui)
66 benefit = linear - (_startupcost * workers + linear / workers)
67 benefit = linear - (_startupcost * workers + linear / workers)
67 return benefit >= 0.15
68 return benefit >= 0.15
68
69
69 def worker(ui, costperarg, func, staticargs, args):
70 def worker(ui, costperarg, func, staticargs, args):
70 '''run a function, possibly in parallel in multiple worker
71 '''run a function, possibly in parallel in multiple worker
71 processes.
72 processes.
72
73
73 returns a progress iterator
74 returns a progress iterator
74
75
75 costperarg - cost of a single task
76 costperarg - cost of a single task
76
77
77 func - function to run
78 func - function to run
78
79
79 staticargs - arguments to pass to every invocation of the function
80 staticargs - arguments to pass to every invocation of the function
80
81
81 args - arguments to split into chunks, to pass to individual
82 args - arguments to split into chunks, to pass to individual
82 workers
83 workers
83 '''
84 '''
84 if worthwhile(ui, costperarg, len(args)):
85 if worthwhile(ui, costperarg, len(args)):
85 return _platformworker(ui, func, staticargs, args)
86 return _platformworker(ui, func, staticargs, args)
86 return func(*staticargs + (args,))
87 return func(*staticargs + (args,))
87
88
88 def _posixworker(ui, func, staticargs, args):
89 def _posixworker(ui, func, staticargs, args):
89 rfd, wfd = os.pipe()
90 rfd, wfd = os.pipe()
90 workers = _numworkers(ui)
91 workers = _numworkers(ui)
91 oldhandler = signal.getsignal(signal.SIGINT)
92 oldhandler = signal.getsignal(signal.SIGINT)
92 signal.signal(signal.SIGINT, signal.SIG_IGN)
93 signal.signal(signal.SIGINT, signal.SIG_IGN)
93 pids, problem = set(), [0]
94 pids, problem = set(), [0]
94 def killworkers():
95 def killworkers():
95 # unregister SIGCHLD handler as all children will be killed. This
96 # unregister SIGCHLD handler as all children will be killed. This
96 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
97 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
97 # could be updated while iterating, which would cause inconsistency.
98 # could be updated while iterating, which would cause inconsistency.
98 signal.signal(signal.SIGCHLD, oldchldhandler)
99 signal.signal(signal.SIGCHLD, oldchldhandler)
99 # if one worker bails, there's no good reason to wait for the rest
100 # if one worker bails, there's no good reason to wait for the rest
100 for p in pids:
101 for p in pids:
101 try:
102 try:
102 os.kill(p, signal.SIGTERM)
103 os.kill(p, signal.SIGTERM)
103 except OSError as err:
104 except OSError as err:
104 if err.errno != errno.ESRCH:
105 if err.errno != errno.ESRCH:
105 raise
106 raise
106 def waitforworkers(blocking=True):
107 def waitforworkers(blocking=True):
107 for pid in pids.copy():
108 for pid in pids.copy():
108 p = st = 0
109 p = st = 0
109 while True:
110 while True:
110 try:
111 try:
111 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
112 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
112 break
113 break
113 except OSError as e:
114 except OSError as e:
114 if e.errno == errno.EINTR:
115 if e.errno == errno.EINTR:
115 continue
116 continue
116 elif e.errno == errno.ECHILD:
117 elif e.errno == errno.ECHILD:
117 # child would already be reaped, but pids yet been
118 # child would already be reaped, but pids yet been
118 # updated (maybe interrupted just after waitpid)
119 # updated (maybe interrupted just after waitpid)
119 pids.discard(pid)
120 pids.discard(pid)
120 break
121 break
121 else:
122 else:
122 raise
123 raise
123 if not p:
124 if not p:
124 # skip subsequent steps, because child process should
125 # skip subsequent steps, because child process should
125 # be still running in this case
126 # be still running in this case
126 continue
127 continue
127 pids.discard(p)
128 pids.discard(p)
128 st = _exitstatus(st)
129 st = _exitstatus(st)
129 if st and not problem[0]:
130 if st and not problem[0]:
130 problem[0] = st
131 problem[0] = st
131 def sigchldhandler(signum, frame):
132 def sigchldhandler(signum, frame):
132 waitforworkers(blocking=False)
133 waitforworkers(blocking=False)
133 if problem[0]:
134 if problem[0]:
134 killworkers()
135 killworkers()
135 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
136 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
136 ui.flush()
137 ui.flush()
137 parentpid = os.getpid()
138 parentpid = os.getpid()
138 for pargs in partition(args, workers):
139 for pargs in partition(args, workers):
139 # make sure we use os._exit in all worker code paths. otherwise the
140 # make sure we use os._exit in all worker code paths. otherwise the
140 # worker may do some clean-ups which could cause surprises like
141 # worker may do some clean-ups which could cause surprises like
141 # deadlock. see sshpeer.cleanup for example.
142 # deadlock. see sshpeer.cleanup for example.
142 # override error handling *before* fork. this is necessary because
143 # override error handling *before* fork. this is necessary because
143 # exception (signal) may arrive after fork, before "pid =" assignment
144 # exception (signal) may arrive after fork, before "pid =" assignment
144 # completes, and other exception handler (dispatch.py) can lead to
145 # completes, and other exception handler (dispatch.py) can lead to
145 # unexpected code path without os._exit.
146 # unexpected code path without os._exit.
146 ret = -1
147 ret = -1
147 try:
148 try:
148 pid = os.fork()
149 pid = os.fork()
149 if pid == 0:
150 if pid == 0:
150 signal.signal(signal.SIGINT, oldhandler)
151 signal.signal(signal.SIGINT, oldhandler)
151 signal.signal(signal.SIGCHLD, oldchldhandler)
152 signal.signal(signal.SIGCHLD, oldchldhandler)
152
153
153 def workerfunc():
154 def workerfunc():
154 os.close(rfd)
155 os.close(rfd)
155 for i, item in func(*(staticargs + (pargs,))):
156 for i, item in func(*(staticargs + (pargs,))):
156 os.write(wfd, '%d %s\n' % (i, item))
157 os.write(wfd, '%d %s\n' % (i, item))
157 return 0
158 return 0
158
159
159 ret = scmutil.callcatch(ui, workerfunc)
160 ret = scmutil.callcatch(ui, workerfunc)
160 except: # parent re-raises, child never returns
161 except: # parent re-raises, child never returns
161 if os.getpid() == parentpid:
162 if os.getpid() == parentpid:
162 raise
163 raise
163 exctype = sys.exc_info()[0]
164 exctype = sys.exc_info()[0]
164 force = not issubclass(exctype, KeyboardInterrupt)
165 force = not issubclass(exctype, KeyboardInterrupt)
165 ui.traceback(force=force)
166 ui.traceback(force=force)
166 finally:
167 finally:
167 if os.getpid() != parentpid:
168 if os.getpid() != parentpid:
168 try:
169 try:
169 ui.flush()
170 ui.flush()
170 except: # never returns, no re-raises
171 except: # never returns, no re-raises
171 pass
172 pass
172 finally:
173 finally:
173 os._exit(ret & 255)
174 os._exit(ret & 255)
174 pids.add(pid)
175 pids.add(pid)
175 os.close(wfd)
176 os.close(wfd)
176 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
177 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
177 def cleanup():
178 def cleanup():
178 signal.signal(signal.SIGINT, oldhandler)
179 signal.signal(signal.SIGINT, oldhandler)
179 waitforworkers()
180 waitforworkers()
180 signal.signal(signal.SIGCHLD, oldchldhandler)
181 signal.signal(signal.SIGCHLD, oldchldhandler)
181 status = problem[0]
182 status = problem[0]
182 if status:
183 if status:
183 if status < 0:
184 if status < 0:
184 os.kill(os.getpid(), -status)
185 os.kill(os.getpid(), -status)
185 sys.exit(status)
186 sys.exit(status)
186 try:
187 try:
187 for line in util.iterfile(fp):
188 for line in util.iterfile(fp):
188 l = line.split(' ', 1)
189 l = line.split(' ', 1)
189 yield int(l[0]), l[1][:-1]
190 yield int(l[0]), l[1][:-1]
190 except: # re-raises
191 except: # re-raises
191 killworkers()
192 killworkers()
192 cleanup()
193 cleanup()
193 raise
194 raise
194 cleanup()
195 cleanup()
195
196
196 def _posixexitstatus(code):
197 def _posixexitstatus(code):
197 '''convert a posix exit status into the same form returned by
198 '''convert a posix exit status into the same form returned by
198 os.spawnv
199 os.spawnv
199
200
200 returns None if the process was stopped instead of exiting'''
201 returns None if the process was stopped instead of exiting'''
201 if os.WIFEXITED(code):
202 if os.WIFEXITED(code):
202 return os.WEXITSTATUS(code)
203 return os.WEXITSTATUS(code)
203 elif os.WIFSIGNALED(code):
204 elif os.WIFSIGNALED(code):
204 return -os.WTERMSIG(code)
205 return -os.WTERMSIG(code)
205
206
206 if not pycompat.iswindows:
207 def _windowsworker(ui, func, staticargs, args):
208 class Worker(threading.Thread):
209 def __init__(self, taskqueue, resultqueue, func, staticargs,
210 group=None, target=None, name=None, verbose=None):
211 threading.Thread.__init__(self, group=group, target=target,
212 name=name, verbose=verbose)
213 self._taskqueue = taskqueue
214 self._resultqueue = resultqueue
215 self._func = func
216 self._staticargs = staticargs
217
218 def run(self):
219 while not self._taskqueue.empty():
220 try:
221 args = self._taskqueue.get_nowait()
222 for res in self._func(*self._staticargs + (args,)):
223 self._resultqueue.put(res)
224 except util.empty:
225 break
226
227 workers = _numworkers(ui)
228 threads = []
229 resultqueue = util.queue()
230 taskqueue = util.queue()
231 # partition work to more pieces than workers to minimize the chance
232 # of uneven distribution of large tasks between the workers
233 for pargs in partition(args, workers * 20):
234 taskqueue.put(pargs)
235 for _i in range(workers):
236 t = Worker(taskqueue, resultqueue, func, staticargs)
237 threads.append(t)
238 t.start()
239 while any(t.is_alive() for t in threads):
240 while not resultqueue.empty():
241 yield resultqueue.get()
242 t = threads[0]
243 t.join(0.05)
244 if not t.is_alive():
245 threads.remove(t)
246 while not resultqueue.empty():
247 yield resultqueue.get()
248
249 if pycompat.iswindows:
250 _platformworker = _windowsworker
251 else:
207 _platformworker = _posixworker
252 _platformworker = _posixworker
208 _exitstatus = _posixexitstatus
253 _exitstatus = _posixexitstatus
209
254
210 def partition(lst, nslices):
255 def partition(lst, nslices):
211 '''partition a list into N slices of roughly equal size
256 '''partition a list into N slices of roughly equal size
212
257
213 The current strategy takes every Nth element from the input. If
258 The current strategy takes every Nth element from the input. If
214 we ever write workers that need to preserve grouping in input
259 we ever write workers that need to preserve grouping in input
215 we should consider allowing callers to specify a partition strategy.
260 we should consider allowing callers to specify a partition strategy.
216
261
217 mpm is not a fan of this partitioning strategy when files are involved.
262 mpm is not a fan of this partitioning strategy when files are involved.
218 In his words:
263 In his words:
219
264
220 Single-threaded Mercurial makes a point of creating and visiting
265 Single-threaded Mercurial makes a point of creating and visiting
221 files in a fixed order (alphabetical). When creating files in order,
266 files in a fixed order (alphabetical). When creating files in order,
222 a typical filesystem is likely to allocate them on nearby regions on
267 a typical filesystem is likely to allocate them on nearby regions on
223 disk. Thus, when revisiting in the same order, locality is maximized
268 disk. Thus, when revisiting in the same order, locality is maximized
224 and various forms of OS and disk-level caching and read-ahead get a
269 and various forms of OS and disk-level caching and read-ahead get a
225 chance to work.
270 chance to work.
226
271
227 This effect can be quite significant on spinning disks. I discovered it
272 This effect can be quite significant on spinning disks. I discovered it
228 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
273 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
229 Tarring a repo and copying it to another disk effectively randomized
274 Tarring a repo and copying it to another disk effectively randomized
230 the revlog ordering on disk by sorting the revlogs by hash and suddenly
275 the revlog ordering on disk by sorting the revlogs by hash and suddenly
231 performance of my kernel checkout benchmark dropped by ~10x because the
276 performance of my kernel checkout benchmark dropped by ~10x because the
232 "working set" of sectors visited no longer fit in the drive's cache and
277 "working set" of sectors visited no longer fit in the drive's cache and
233 the workload switched from streaming to random I/O.
278 the workload switched from streaming to random I/O.
234
279
235 What we should really be doing is have workers read filenames from a
280 What we should really be doing is have workers read filenames from a
236 ordered queue. This preserves locality and also keeps any worker from
281 ordered queue. This preserves locality and also keeps any worker from
237 getting more than one file out of balance.
282 getting more than one file out of balance.
238 '''
283 '''
239 for i in range(nslices):
284 for i in range(nslices):
240 yield lst[i::nslices]
285 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now