##// END OF EJS Templates
worker: make sure killworkers() never be interrupted by another SIGCHLD...
Yuya Nishihara -
r30423:237b2883 default
parent child Browse files
Show More
@@ -1,204 +1,206
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14
14
15 from .i18n import _
15 from .i18n import _
16 from . import (
16 from . import (
17 error,
17 error,
18 util,
18 util,
19 )
19 )
20
20
21 def countcpus():
21 def countcpus():
22 '''try to count the number of CPUs on the system'''
22 '''try to count the number of CPUs on the system'''
23
23
24 # posix
24 # posix
25 try:
25 try:
26 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
26 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
27 if n > 0:
27 if n > 0:
28 return n
28 return n
29 except (AttributeError, ValueError):
29 except (AttributeError, ValueError):
30 pass
30 pass
31
31
32 # windows
32 # windows
33 try:
33 try:
34 n = int(os.environ['NUMBER_OF_PROCESSORS'])
34 n = int(os.environ['NUMBER_OF_PROCESSORS'])
35 if n > 0:
35 if n > 0:
36 return n
36 return n
37 except (KeyError, ValueError):
37 except (KeyError, ValueError):
38 pass
38 pass
39
39
40 return 1
40 return 1
41
41
42 def _numworkers(ui):
42 def _numworkers(ui):
43 s = ui.config('worker', 'numcpus')
43 s = ui.config('worker', 'numcpus')
44 if s:
44 if s:
45 try:
45 try:
46 n = int(s)
46 n = int(s)
47 if n >= 1:
47 if n >= 1:
48 return n
48 return n
49 except ValueError:
49 except ValueError:
50 raise error.Abort(_('number of cpus must be an integer'))
50 raise error.Abort(_('number of cpus must be an integer'))
51 return min(max(countcpus(), 4), 32)
51 return min(max(countcpus(), 4), 32)
52
52
53 if os.name == 'posix':
53 if os.name == 'posix':
54 _startupcost = 0.01
54 _startupcost = 0.01
55 else:
55 else:
56 _startupcost = 1e30
56 _startupcost = 1e30
57
57
58 def worthwhile(ui, costperop, nops):
58 def worthwhile(ui, costperop, nops):
59 '''try to determine whether the benefit of multiple processes can
59 '''try to determine whether the benefit of multiple processes can
60 outweigh the cost of starting them'''
60 outweigh the cost of starting them'''
61 linear = costperop * nops
61 linear = costperop * nops
62 workers = _numworkers(ui)
62 workers = _numworkers(ui)
63 benefit = linear - (_startupcost * workers + linear / workers)
63 benefit = linear - (_startupcost * workers + linear / workers)
64 return benefit >= 0.15
64 return benefit >= 0.15
65
65
66 def worker(ui, costperarg, func, staticargs, args):
66 def worker(ui, costperarg, func, staticargs, args):
67 '''run a function, possibly in parallel in multiple worker
67 '''run a function, possibly in parallel in multiple worker
68 processes.
68 processes.
69
69
70 returns a progress iterator
70 returns a progress iterator
71
71
72 costperarg - cost of a single task
72 costperarg - cost of a single task
73
73
74 func - function to run
74 func - function to run
75
75
76 staticargs - arguments to pass to every invocation of the function
76 staticargs - arguments to pass to every invocation of the function
77
77
78 args - arguments to split into chunks, to pass to individual
78 args - arguments to split into chunks, to pass to individual
79 workers
79 workers
80 '''
80 '''
81 if worthwhile(ui, costperarg, len(args)):
81 if worthwhile(ui, costperarg, len(args)):
82 return _platformworker(ui, func, staticargs, args)
82 return _platformworker(ui, func, staticargs, args)
83 return func(*staticargs + (args,))
83 return func(*staticargs + (args,))
84
84
85 def _posixworker(ui, func, staticargs, args):
85 def _posixworker(ui, func, staticargs, args):
86 rfd, wfd = os.pipe()
86 rfd, wfd = os.pipe()
87 workers = _numworkers(ui)
87 workers = _numworkers(ui)
88 oldhandler = signal.getsignal(signal.SIGINT)
88 oldhandler = signal.getsignal(signal.SIGINT)
89 signal.signal(signal.SIGINT, signal.SIG_IGN)
89 signal.signal(signal.SIGINT, signal.SIG_IGN)
90 pids, problem = set(), [0]
90 pids, problem = set(), [0]
91 def killworkers():
91 def killworkers():
92 # unregister SIGCHLD handler as all children will be killed. This
93 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
94 # could be updated while iterating, which would cause inconsistency.
95 signal.signal(signal.SIGCHLD, oldchldhandler)
92 # if one worker bails, there's no good reason to wait for the rest
96 # if one worker bails, there's no good reason to wait for the rest
93 for p in pids:
97 for p in pids:
94 try:
98 try:
95 os.kill(p, signal.SIGTERM)
99 os.kill(p, signal.SIGTERM)
96 except OSError as err:
100 except OSError as err:
97 if err.errno != errno.ESRCH:
101 if err.errno != errno.ESRCH:
98 raise
102 raise
99 def waitforworkers(blocking=True):
103 def waitforworkers(blocking=True):
100 for pid in pids.copy():
104 for pid in pids.copy():
101 p = st = 0
105 p = st = 0
102 while True:
106 while True:
103 try:
107 try:
104 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
108 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
105 break
109 break
106 except OSError as e:
110 except OSError as e:
107 if e.errno == errno.EINTR:
111 if e.errno == errno.EINTR:
108 continue
112 continue
109 elif e.errno == errno.ECHILD:
113 elif e.errno == errno.ECHILD:
110 break # ignore ECHILD
114 break # ignore ECHILD
111 else:
115 else:
112 raise
116 raise
113 if p:
117 if p:
114 pids.remove(p)
118 pids.remove(p)
115 st = _exitstatus(st)
119 st = _exitstatus(st)
116 if st and not problem[0]:
120 if st and not problem[0]:
117 problem[0] = st
121 problem[0] = st
118 # unregister SIGCHLD handler as all children will be killed
119 signal.signal(signal.SIGCHLD, oldchldhandler)
120 killworkers()
122 killworkers()
121 def sigchldhandler(signum, frame):
123 def sigchldhandler(signum, frame):
122 waitforworkers(blocking=False)
124 waitforworkers(blocking=False)
123 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
125 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
124 for pargs in partition(args, workers):
126 for pargs in partition(args, workers):
125 pid = os.fork()
127 pid = os.fork()
126 if pid == 0:
128 if pid == 0:
127 signal.signal(signal.SIGINT, oldhandler)
129 signal.signal(signal.SIGINT, oldhandler)
128 signal.signal(signal.SIGCHLD, oldchldhandler)
130 signal.signal(signal.SIGCHLD, oldchldhandler)
129 try:
131 try:
130 os.close(rfd)
132 os.close(rfd)
131 for i, item in func(*(staticargs + (pargs,))):
133 for i, item in func(*(staticargs + (pargs,))):
132 os.write(wfd, '%d %s\n' % (i, item))
134 os.write(wfd, '%d %s\n' % (i, item))
133 os._exit(0)
135 os._exit(0)
134 except KeyboardInterrupt:
136 except KeyboardInterrupt:
135 os._exit(255)
137 os._exit(255)
136 # other exceptions are allowed to propagate, we rely
138 # other exceptions are allowed to propagate, we rely
137 # on lock.py's pid checks to avoid release callbacks
139 # on lock.py's pid checks to avoid release callbacks
138 pids.add(pid)
140 pids.add(pid)
139 os.close(wfd)
141 os.close(wfd)
140 fp = os.fdopen(rfd, 'rb', 0)
142 fp = os.fdopen(rfd, 'rb', 0)
141 def cleanup():
143 def cleanup():
142 signal.signal(signal.SIGINT, oldhandler)
144 signal.signal(signal.SIGINT, oldhandler)
143 waitforworkers()
145 waitforworkers()
144 signal.signal(signal.SIGCHLD, oldchldhandler)
146 signal.signal(signal.SIGCHLD, oldchldhandler)
145 status = problem[0]
147 status = problem[0]
146 if status:
148 if status:
147 if status < 0:
149 if status < 0:
148 os.kill(os.getpid(), -status)
150 os.kill(os.getpid(), -status)
149 sys.exit(status)
151 sys.exit(status)
150 try:
152 try:
151 for line in util.iterfile(fp):
153 for line in util.iterfile(fp):
152 l = line.split(' ', 1)
154 l = line.split(' ', 1)
153 yield int(l[0]), l[1][:-1]
155 yield int(l[0]), l[1][:-1]
154 except: # re-raises
156 except: # re-raises
155 killworkers()
157 killworkers()
156 cleanup()
158 cleanup()
157 raise
159 raise
158 cleanup()
160 cleanup()
159
161
160 def _posixexitstatus(code):
162 def _posixexitstatus(code):
161 '''convert a posix exit status into the same form returned by
163 '''convert a posix exit status into the same form returned by
162 os.spawnv
164 os.spawnv
163
165
164 returns None if the process was stopped instead of exiting'''
166 returns None if the process was stopped instead of exiting'''
165 if os.WIFEXITED(code):
167 if os.WIFEXITED(code):
166 return os.WEXITSTATUS(code)
168 return os.WEXITSTATUS(code)
167 elif os.WIFSIGNALED(code):
169 elif os.WIFSIGNALED(code):
168 return -os.WTERMSIG(code)
170 return -os.WTERMSIG(code)
169
171
170 if os.name != 'nt':
172 if os.name != 'nt':
171 _platformworker = _posixworker
173 _platformworker = _posixworker
172 _exitstatus = _posixexitstatus
174 _exitstatus = _posixexitstatus
173
175
174 def partition(lst, nslices):
176 def partition(lst, nslices):
175 '''partition a list into N slices of roughly equal size
177 '''partition a list into N slices of roughly equal size
176
178
177 The current strategy takes every Nth element from the input. If
179 The current strategy takes every Nth element from the input. If
178 we ever write workers that need to preserve grouping in input
180 we ever write workers that need to preserve grouping in input
179 we should consider allowing callers to specify a partition strategy.
181 we should consider allowing callers to specify a partition strategy.
180
182
181 mpm is not a fan of this partitioning strategy when files are involved.
183 mpm is not a fan of this partitioning strategy when files are involved.
182 In his words:
184 In his words:
183
185
184 Single-threaded Mercurial makes a point of creating and visiting
186 Single-threaded Mercurial makes a point of creating and visiting
185 files in a fixed order (alphabetical). When creating files in order,
187 files in a fixed order (alphabetical). When creating files in order,
186 a typical filesystem is likely to allocate them on nearby regions on
188 a typical filesystem is likely to allocate them on nearby regions on
187 disk. Thus, when revisiting in the same order, locality is maximized
189 disk. Thus, when revisiting in the same order, locality is maximized
188 and various forms of OS and disk-level caching and read-ahead get a
190 and various forms of OS and disk-level caching and read-ahead get a
189 chance to work.
191 chance to work.
190
192
191 This effect can be quite significant on spinning disks. I discovered it
193 This effect can be quite significant on spinning disks. I discovered it
192 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
194 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
193 Tarring a repo and copying it to another disk effectively randomized
195 Tarring a repo and copying it to another disk effectively randomized
194 the revlog ordering on disk by sorting the revlogs by hash and suddenly
196 the revlog ordering on disk by sorting the revlogs by hash and suddenly
195 performance of my kernel checkout benchmark dropped by ~10x because the
197 performance of my kernel checkout benchmark dropped by ~10x because the
196 "working set" of sectors visited no longer fit in the drive's cache and
198 "working set" of sectors visited no longer fit in the drive's cache and
197 the workload switched from streaming to random I/O.
199 the workload switched from streaming to random I/O.
198
200
199 What we should really be doing is have workers read filenames from a
201 What we should really be doing is have workers read filenames from a
200 ordered queue. This preserves locality and also keeps any worker from
202 ordered queue. This preserves locality and also keeps any worker from
201 getting more than one file out of balance.
203 getting more than one file out of balance.
202 '''
204 '''
203 for i in range(nslices):
205 for i in range(nslices):
204 yield lst[i::nslices]
206 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now