##// END OF EJS Templates
worker: ignore meaningless exit status indication returned by os.waitpid()...
FUJIWARA Katsunori -
r31063:18fb3cf5 stable
parent child Browse files
Show More
@@ -1,224 +1,227 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14
14
15 from .i18n import _
15 from .i18n import _
16 from . import (
16 from . import (
17 encoding,
17 encoding,
18 error,
18 error,
19 pycompat,
19 pycompat,
20 scmutil,
20 scmutil,
21 util,
21 util,
22 )
22 )
23
23
24 def countcpus():
24 def countcpus():
25 '''try to count the number of CPUs on the system'''
25 '''try to count the number of CPUs on the system'''
26
26
27 # posix
27 # posix
28 try:
28 try:
29 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
29 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
30 if n > 0:
30 if n > 0:
31 return n
31 return n
32 except (AttributeError, ValueError):
32 except (AttributeError, ValueError):
33 pass
33 pass
34
34
35 # windows
35 # windows
36 try:
36 try:
37 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
37 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
38 if n > 0:
38 if n > 0:
39 return n
39 return n
40 except (KeyError, ValueError):
40 except (KeyError, ValueError):
41 pass
41 pass
42
42
43 return 1
43 return 1
44
44
45 def _numworkers(ui):
45 def _numworkers(ui):
46 s = ui.config('worker', 'numcpus')
46 s = ui.config('worker', 'numcpus')
47 if s:
47 if s:
48 try:
48 try:
49 n = int(s)
49 n = int(s)
50 if n >= 1:
50 if n >= 1:
51 return n
51 return n
52 except ValueError:
52 except ValueError:
53 raise error.Abort(_('number of cpus must be an integer'))
53 raise error.Abort(_('number of cpus must be an integer'))
54 return min(max(countcpus(), 4), 32)
54 return min(max(countcpus(), 4), 32)
55
55
56 if pycompat.osname == 'posix':
56 if pycompat.osname == 'posix':
57 _startupcost = 0.01
57 _startupcost = 0.01
58 else:
58 else:
59 _startupcost = 1e30
59 _startupcost = 1e30
60
60
61 def worthwhile(ui, costperop, nops):
61 def worthwhile(ui, costperop, nops):
62 '''try to determine whether the benefit of multiple processes can
62 '''try to determine whether the benefit of multiple processes can
63 outweigh the cost of starting them'''
63 outweigh the cost of starting them'''
64 linear = costperop * nops
64 linear = costperop * nops
65 workers = _numworkers(ui)
65 workers = _numworkers(ui)
66 benefit = linear - (_startupcost * workers + linear / workers)
66 benefit = linear - (_startupcost * workers + linear / workers)
67 return benefit >= 0.15
67 return benefit >= 0.15
68
68
69 def worker(ui, costperarg, func, staticargs, args):
69 def worker(ui, costperarg, func, staticargs, args):
70 '''run a function, possibly in parallel in multiple worker
70 '''run a function, possibly in parallel in multiple worker
71 processes.
71 processes.
72
72
73 returns a progress iterator
73 returns a progress iterator
74
74
75 costperarg - cost of a single task
75 costperarg - cost of a single task
76
76
77 func - function to run
77 func - function to run
78
78
79 staticargs - arguments to pass to every invocation of the function
79 staticargs - arguments to pass to every invocation of the function
80
80
81 args - arguments to split into chunks, to pass to individual
81 args - arguments to split into chunks, to pass to individual
82 workers
82 workers
83 '''
83 '''
84 if worthwhile(ui, costperarg, len(args)):
84 if worthwhile(ui, costperarg, len(args)):
85 return _platformworker(ui, func, staticargs, args)
85 return _platformworker(ui, func, staticargs, args)
86 return func(*staticargs + (args,))
86 return func(*staticargs + (args,))
87
87
88 def _posixworker(ui, func, staticargs, args):
88 def _posixworker(ui, func, staticargs, args):
89 rfd, wfd = os.pipe()
89 rfd, wfd = os.pipe()
90 workers = _numworkers(ui)
90 workers = _numworkers(ui)
91 oldhandler = signal.getsignal(signal.SIGINT)
91 oldhandler = signal.getsignal(signal.SIGINT)
92 signal.signal(signal.SIGINT, signal.SIG_IGN)
92 signal.signal(signal.SIGINT, signal.SIG_IGN)
93 pids, problem = set(), [0]
93 pids, problem = set(), [0]
94 def killworkers():
94 def killworkers():
95 # unregister SIGCHLD handler as all children will be killed. This
95 # unregister SIGCHLD handler as all children will be killed. This
96 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
96 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
97 # could be updated while iterating, which would cause inconsistency.
97 # could be updated while iterating, which would cause inconsistency.
98 signal.signal(signal.SIGCHLD, oldchldhandler)
98 signal.signal(signal.SIGCHLD, oldchldhandler)
99 # if one worker bails, there's no good reason to wait for the rest
99 # if one worker bails, there's no good reason to wait for the rest
100 for p in pids:
100 for p in pids:
101 try:
101 try:
102 os.kill(p, signal.SIGTERM)
102 os.kill(p, signal.SIGTERM)
103 except OSError as err:
103 except OSError as err:
104 if err.errno != errno.ESRCH:
104 if err.errno != errno.ESRCH:
105 raise
105 raise
106 def waitforworkers(blocking=True):
106 def waitforworkers(blocking=True):
107 for pid in pids.copy():
107 for pid in pids.copy():
108 p = st = 0
108 p = st = 0
109 while True:
109 while True:
110 try:
110 try:
111 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
111 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
112 break
112 break
113 except OSError as e:
113 except OSError as e:
114 if e.errno == errno.EINTR:
114 if e.errno == errno.EINTR:
115 continue
115 continue
116 elif e.errno == errno.ECHILD:
116 elif e.errno == errno.ECHILD:
117 # child would already be reaped, but pids yet been
117 # child would already be reaped, but pids yet been
118 # updated (maybe interrupted just after waitpid)
118 # updated (maybe interrupted just after waitpid)
119 pids.discard(pid)
119 pids.discard(pid)
120 break
120 break
121 else:
121 else:
122 raise
122 raise
123 if p:
123 if not p:
124 pids.discard(p)
124 # skip subsequent steps, because child process should
125 st = _exitstatus(st)
125 # be still running in this case
126 continue
127 pids.discard(p)
128 st = _exitstatus(st)
126 if st and not problem[0]:
129 if st and not problem[0]:
127 problem[0] = st
130 problem[0] = st
128 def sigchldhandler(signum, frame):
131 def sigchldhandler(signum, frame):
129 waitforworkers(blocking=False)
132 waitforworkers(blocking=False)
130 if problem[0]:
133 if problem[0]:
131 killworkers()
134 killworkers()
132 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
135 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
133 for pargs in partition(args, workers):
136 for pargs in partition(args, workers):
134 pid = os.fork()
137 pid = os.fork()
135 if pid == 0:
138 if pid == 0:
136 signal.signal(signal.SIGINT, oldhandler)
139 signal.signal(signal.SIGINT, oldhandler)
137 signal.signal(signal.SIGCHLD, oldchldhandler)
140 signal.signal(signal.SIGCHLD, oldchldhandler)
138
141
139 def workerfunc():
142 def workerfunc():
140 os.close(rfd)
143 os.close(rfd)
141 for i, item in func(*(staticargs + (pargs,))):
144 for i, item in func(*(staticargs + (pargs,))):
142 os.write(wfd, '%d %s\n' % (i, item))
145 os.write(wfd, '%d %s\n' % (i, item))
143
146
144 # make sure we use os._exit in all code paths. otherwise the worker
147 # make sure we use os._exit in all code paths. otherwise the worker
145 # may do some clean-ups which could cause surprises like deadlock.
148 # may do some clean-ups which could cause surprises like deadlock.
146 # see sshpeer.cleanup for example.
149 # see sshpeer.cleanup for example.
147 try:
150 try:
148 scmutil.callcatch(ui, workerfunc)
151 scmutil.callcatch(ui, workerfunc)
149 except KeyboardInterrupt:
152 except KeyboardInterrupt:
150 os._exit(255)
153 os._exit(255)
151 except: # never return, therefore no re-raises
154 except: # never return, therefore no re-raises
152 try:
155 try:
153 ui.traceback()
156 ui.traceback()
154 finally:
157 finally:
155 os._exit(255)
158 os._exit(255)
156 else:
159 else:
157 os._exit(0)
160 os._exit(0)
158 pids.add(pid)
161 pids.add(pid)
159 os.close(wfd)
162 os.close(wfd)
160 fp = os.fdopen(rfd, 'rb', 0)
163 fp = os.fdopen(rfd, 'rb', 0)
161 def cleanup():
164 def cleanup():
162 signal.signal(signal.SIGINT, oldhandler)
165 signal.signal(signal.SIGINT, oldhandler)
163 waitforworkers()
166 waitforworkers()
164 signal.signal(signal.SIGCHLD, oldchldhandler)
167 signal.signal(signal.SIGCHLD, oldchldhandler)
165 status = problem[0]
168 status = problem[0]
166 if status:
169 if status:
167 if status < 0:
170 if status < 0:
168 os.kill(os.getpid(), -status)
171 os.kill(os.getpid(), -status)
169 sys.exit(status)
172 sys.exit(status)
170 try:
173 try:
171 for line in util.iterfile(fp):
174 for line in util.iterfile(fp):
172 l = line.split(' ', 1)
175 l = line.split(' ', 1)
173 yield int(l[0]), l[1][:-1]
176 yield int(l[0]), l[1][:-1]
174 except: # re-raises
177 except: # re-raises
175 killworkers()
178 killworkers()
176 cleanup()
179 cleanup()
177 raise
180 raise
178 cleanup()
181 cleanup()
179
182
180 def _posixexitstatus(code):
183 def _posixexitstatus(code):
181 '''convert a posix exit status into the same form returned by
184 '''convert a posix exit status into the same form returned by
182 os.spawnv
185 os.spawnv
183
186
184 returns None if the process was stopped instead of exiting'''
187 returns None if the process was stopped instead of exiting'''
185 if os.WIFEXITED(code):
188 if os.WIFEXITED(code):
186 return os.WEXITSTATUS(code)
189 return os.WEXITSTATUS(code)
187 elif os.WIFSIGNALED(code):
190 elif os.WIFSIGNALED(code):
188 return -os.WTERMSIG(code)
191 return -os.WTERMSIG(code)
189
192
190 if pycompat.osname != 'nt':
193 if pycompat.osname != 'nt':
191 _platformworker = _posixworker
194 _platformworker = _posixworker
192 _exitstatus = _posixexitstatus
195 _exitstatus = _posixexitstatus
193
196
194 def partition(lst, nslices):
197 def partition(lst, nslices):
195 '''partition a list into N slices of roughly equal size
198 '''partition a list into N slices of roughly equal size
196
199
197 The current strategy takes every Nth element from the input. If
200 The current strategy takes every Nth element from the input. If
198 we ever write workers that need to preserve grouping in input
201 we ever write workers that need to preserve grouping in input
199 we should consider allowing callers to specify a partition strategy.
202 we should consider allowing callers to specify a partition strategy.
200
203
201 mpm is not a fan of this partitioning strategy when files are involved.
204 mpm is not a fan of this partitioning strategy when files are involved.
202 In his words:
205 In his words:
203
206
204 Single-threaded Mercurial makes a point of creating and visiting
207 Single-threaded Mercurial makes a point of creating and visiting
205 files in a fixed order (alphabetical). When creating files in order,
208 files in a fixed order (alphabetical). When creating files in order,
206 a typical filesystem is likely to allocate them on nearby regions on
209 a typical filesystem is likely to allocate them on nearby regions on
207 disk. Thus, when revisiting in the same order, locality is maximized
210 disk. Thus, when revisiting in the same order, locality is maximized
208 and various forms of OS and disk-level caching and read-ahead get a
211 and various forms of OS and disk-level caching and read-ahead get a
209 chance to work.
212 chance to work.
210
213
211 This effect can be quite significant on spinning disks. I discovered it
214 This effect can be quite significant on spinning disks. I discovered it
212 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
215 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
213 Tarring a repo and copying it to another disk effectively randomized
216 Tarring a repo and copying it to another disk effectively randomized
214 the revlog ordering on disk by sorting the revlogs by hash and suddenly
217 the revlog ordering on disk by sorting the revlogs by hash and suddenly
215 performance of my kernel checkout benchmark dropped by ~10x because the
218 performance of my kernel checkout benchmark dropped by ~10x because the
216 "working set" of sectors visited no longer fit in the drive's cache and
219 "working set" of sectors visited no longer fit in the drive's cache and
217 the workload switched from streaming to random I/O.
220 the workload switched from streaming to random I/O.
218
221
219 What we should really be doing is have workers read filenames from a
222 What we should really be doing is have workers read filenames from a
220 ordered queue. This preserves locality and also keeps any worker from
223 ordered queue. This preserves locality and also keeps any worker from
221 getting more than one file out of balance.
224 getting more than one file out of balance.
222 '''
225 '''
223 for i in range(nslices):
226 for i in range(nslices):
224 yield lst[i::nslices]
227 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now