##// END OF EJS Templates
worker: kill workers after all zombie processes are reaped...
Yuya Nishihara -
r30424:f2d13eb8 default
parent child Browse files
Show More
@@ -1,206 +1,207 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14
14
15 from .i18n import _
15 from .i18n import _
16 from . import (
16 from . import (
17 error,
17 error,
18 util,
18 util,
19 )
19 )
20
20
21 def countcpus():
21 def countcpus():
22 '''try to count the number of CPUs on the system'''
22 '''try to count the number of CPUs on the system'''
23
23
24 # posix
24 # posix
25 try:
25 try:
26 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
26 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
27 if n > 0:
27 if n > 0:
28 return n
28 return n
29 except (AttributeError, ValueError):
29 except (AttributeError, ValueError):
30 pass
30 pass
31
31
32 # windows
32 # windows
33 try:
33 try:
34 n = int(os.environ['NUMBER_OF_PROCESSORS'])
34 n = int(os.environ['NUMBER_OF_PROCESSORS'])
35 if n > 0:
35 if n > 0:
36 return n
36 return n
37 except (KeyError, ValueError):
37 except (KeyError, ValueError):
38 pass
38 pass
39
39
40 return 1
40 return 1
41
41
42 def _numworkers(ui):
42 def _numworkers(ui):
43 s = ui.config('worker', 'numcpus')
43 s = ui.config('worker', 'numcpus')
44 if s:
44 if s:
45 try:
45 try:
46 n = int(s)
46 n = int(s)
47 if n >= 1:
47 if n >= 1:
48 return n
48 return n
49 except ValueError:
49 except ValueError:
50 raise error.Abort(_('number of cpus must be an integer'))
50 raise error.Abort(_('number of cpus must be an integer'))
51 return min(max(countcpus(), 4), 32)
51 return min(max(countcpus(), 4), 32)
52
52
53 if os.name == 'posix':
53 if os.name == 'posix':
54 _startupcost = 0.01
54 _startupcost = 0.01
55 else:
55 else:
56 _startupcost = 1e30
56 _startupcost = 1e30
57
57
58 def worthwhile(ui, costperop, nops):
58 def worthwhile(ui, costperop, nops):
59 '''try to determine whether the benefit of multiple processes can
59 '''try to determine whether the benefit of multiple processes can
60 outweigh the cost of starting them'''
60 outweigh the cost of starting them'''
61 linear = costperop * nops
61 linear = costperop * nops
62 workers = _numworkers(ui)
62 workers = _numworkers(ui)
63 benefit = linear - (_startupcost * workers + linear / workers)
63 benefit = linear - (_startupcost * workers + linear / workers)
64 return benefit >= 0.15
64 return benefit >= 0.15
65
65
66 def worker(ui, costperarg, func, staticargs, args):
66 def worker(ui, costperarg, func, staticargs, args):
67 '''run a function, possibly in parallel in multiple worker
67 '''run a function, possibly in parallel in multiple worker
68 processes.
68 processes.
69
69
70 returns a progress iterator
70 returns a progress iterator
71
71
72 costperarg - cost of a single task
72 costperarg - cost of a single task
73
73
74 func - function to run
74 func - function to run
75
75
76 staticargs - arguments to pass to every invocation of the function
76 staticargs - arguments to pass to every invocation of the function
77
77
78 args - arguments to split into chunks, to pass to individual
78 args - arguments to split into chunks, to pass to individual
79 workers
79 workers
80 '''
80 '''
81 if worthwhile(ui, costperarg, len(args)):
81 if worthwhile(ui, costperarg, len(args)):
82 return _platformworker(ui, func, staticargs, args)
82 return _platformworker(ui, func, staticargs, args)
83 return func(*staticargs + (args,))
83 return func(*staticargs + (args,))
84
84
85 def _posixworker(ui, func, staticargs, args):
85 def _posixworker(ui, func, staticargs, args):
86 rfd, wfd = os.pipe()
86 rfd, wfd = os.pipe()
87 workers = _numworkers(ui)
87 workers = _numworkers(ui)
88 oldhandler = signal.getsignal(signal.SIGINT)
88 oldhandler = signal.getsignal(signal.SIGINT)
89 signal.signal(signal.SIGINT, signal.SIG_IGN)
89 signal.signal(signal.SIGINT, signal.SIG_IGN)
90 pids, problem = set(), [0]
90 pids, problem = set(), [0]
91 def killworkers():
91 def killworkers():
92 # unregister SIGCHLD handler as all children will be killed. This
92 # unregister SIGCHLD handler as all children will be killed. This
93 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
93 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
94 # could be updated while iterating, which would cause inconsistency.
94 # could be updated while iterating, which would cause inconsistency.
95 signal.signal(signal.SIGCHLD, oldchldhandler)
95 signal.signal(signal.SIGCHLD, oldchldhandler)
96 # if one worker bails, there's no good reason to wait for the rest
96 # if one worker bails, there's no good reason to wait for the rest
97 for p in pids:
97 for p in pids:
98 try:
98 try:
99 os.kill(p, signal.SIGTERM)
99 os.kill(p, signal.SIGTERM)
100 except OSError as err:
100 except OSError as err:
101 if err.errno != errno.ESRCH:
101 if err.errno != errno.ESRCH:
102 raise
102 raise
103 def waitforworkers(blocking=True):
103 def waitforworkers(blocking=True):
104 for pid in pids.copy():
104 for pid in pids.copy():
105 p = st = 0
105 p = st = 0
106 while True:
106 while True:
107 try:
107 try:
108 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
108 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
109 break
109 break
110 except OSError as e:
110 except OSError as e:
111 if e.errno == errno.EINTR:
111 if e.errno == errno.EINTR:
112 continue
112 continue
113 elif e.errno == errno.ECHILD:
113 elif e.errno == errno.ECHILD:
114 break # ignore ECHILD
114 break # ignore ECHILD
115 else:
115 else:
116 raise
116 raise
117 if p:
117 if p:
118 pids.remove(p)
118 pids.remove(p)
119 st = _exitstatus(st)
119 st = _exitstatus(st)
120 if st and not problem[0]:
120 if st and not problem[0]:
121 problem[0] = st
121 problem[0] = st
122 killworkers()
123 def sigchldhandler(signum, frame):
122 def sigchldhandler(signum, frame):
124 waitforworkers(blocking=False)
123 waitforworkers(blocking=False)
124 if problem[0]:
125 killworkers()
125 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
126 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
126 for pargs in partition(args, workers):
127 for pargs in partition(args, workers):
127 pid = os.fork()
128 pid = os.fork()
128 if pid == 0:
129 if pid == 0:
129 signal.signal(signal.SIGINT, oldhandler)
130 signal.signal(signal.SIGINT, oldhandler)
130 signal.signal(signal.SIGCHLD, oldchldhandler)
131 signal.signal(signal.SIGCHLD, oldchldhandler)
131 try:
132 try:
132 os.close(rfd)
133 os.close(rfd)
133 for i, item in func(*(staticargs + (pargs,))):
134 for i, item in func(*(staticargs + (pargs,))):
134 os.write(wfd, '%d %s\n' % (i, item))
135 os.write(wfd, '%d %s\n' % (i, item))
135 os._exit(0)
136 os._exit(0)
136 except KeyboardInterrupt:
137 except KeyboardInterrupt:
137 os._exit(255)
138 os._exit(255)
138 # other exceptions are allowed to propagate, we rely
139 # other exceptions are allowed to propagate, we rely
139 # on lock.py's pid checks to avoid release callbacks
140 # on lock.py's pid checks to avoid release callbacks
140 pids.add(pid)
141 pids.add(pid)
141 os.close(wfd)
142 os.close(wfd)
142 fp = os.fdopen(rfd, 'rb', 0)
143 fp = os.fdopen(rfd, 'rb', 0)
143 def cleanup():
144 def cleanup():
144 signal.signal(signal.SIGINT, oldhandler)
145 signal.signal(signal.SIGINT, oldhandler)
145 waitforworkers()
146 waitforworkers()
146 signal.signal(signal.SIGCHLD, oldchldhandler)
147 signal.signal(signal.SIGCHLD, oldchldhandler)
147 status = problem[0]
148 status = problem[0]
148 if status:
149 if status:
149 if status < 0:
150 if status < 0:
150 os.kill(os.getpid(), -status)
151 os.kill(os.getpid(), -status)
151 sys.exit(status)
152 sys.exit(status)
152 try:
153 try:
153 for line in util.iterfile(fp):
154 for line in util.iterfile(fp):
154 l = line.split(' ', 1)
155 l = line.split(' ', 1)
155 yield int(l[0]), l[1][:-1]
156 yield int(l[0]), l[1][:-1]
156 except: # re-raises
157 except: # re-raises
157 killworkers()
158 killworkers()
158 cleanup()
159 cleanup()
159 raise
160 raise
160 cleanup()
161 cleanup()
161
162
162 def _posixexitstatus(code):
163 def _posixexitstatus(code):
163 '''convert a posix exit status into the same form returned by
164 '''convert a posix exit status into the same form returned by
164 os.spawnv
165 os.spawnv
165
166
166 returns None if the process was stopped instead of exiting'''
167 returns None if the process was stopped instead of exiting'''
167 if os.WIFEXITED(code):
168 if os.WIFEXITED(code):
168 return os.WEXITSTATUS(code)
169 return os.WEXITSTATUS(code)
169 elif os.WIFSIGNALED(code):
170 elif os.WIFSIGNALED(code):
170 return -os.WTERMSIG(code)
171 return -os.WTERMSIG(code)
171
172
172 if os.name != 'nt':
173 if os.name != 'nt':
173 _platformworker = _posixworker
174 _platformworker = _posixworker
174 _exitstatus = _posixexitstatus
175 _exitstatus = _posixexitstatus
175
176
176 def partition(lst, nslices):
177 def partition(lst, nslices):
177 '''partition a list into N slices of roughly equal size
178 '''partition a list into N slices of roughly equal size
178
179
179 The current strategy takes every Nth element from the input. If
180 The current strategy takes every Nth element from the input. If
180 we ever write workers that need to preserve grouping in input
181 we ever write workers that need to preserve grouping in input
181 we should consider allowing callers to specify a partition strategy.
182 we should consider allowing callers to specify a partition strategy.
182
183
183 mpm is not a fan of this partitioning strategy when files are involved.
184 mpm is not a fan of this partitioning strategy when files are involved.
184 In his words:
185 In his words:
185
186
186 Single-threaded Mercurial makes a point of creating and visiting
187 Single-threaded Mercurial makes a point of creating and visiting
187 files in a fixed order (alphabetical). When creating files in order,
188 files in a fixed order (alphabetical). When creating files in order,
188 a typical filesystem is likely to allocate them on nearby regions on
189 a typical filesystem is likely to allocate them on nearby regions on
189 disk. Thus, when revisiting in the same order, locality is maximized
190 disk. Thus, when revisiting in the same order, locality is maximized
190 and various forms of OS and disk-level caching and read-ahead get a
191 and various forms of OS and disk-level caching and read-ahead get a
191 chance to work.
192 chance to work.
192
193
193 This effect can be quite significant on spinning disks. I discovered it
194 This effect can be quite significant on spinning disks. I discovered it
194 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
195 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
195 Tarring a repo and copying it to another disk effectively randomized
196 Tarring a repo and copying it to another disk effectively randomized
196 the revlog ordering on disk by sorting the revlogs by hash and suddenly
197 the revlog ordering on disk by sorting the revlogs by hash and suddenly
197 performance of my kernel checkout benchmark dropped by ~10x because the
198 performance of my kernel checkout benchmark dropped by ~10x because the
198 "working set" of sectors visited no longer fit in the drive's cache and
199 "working set" of sectors visited no longer fit in the drive's cache and
199 the workload switched from streaming to random I/O.
200 the workload switched from streaming to random I/O.
200
201
201 What we should really be doing is have workers read filenames from a
202 What we should really be doing is have workers read filenames from a
202 ordered queue. This preserves locality and also keeps any worker from
203 ordered queue. This preserves locality and also keeps any worker from
203 getting more than one file out of balance.
204 getting more than one file out of balance.
204 '''
205 '''
205 for i in range(nslices):
206 for i in range(nslices):
206 yield lst[i::nslices]
207 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now