##// END OF EJS Templates
worker: fix missed break on successful waitpid()...
Yuya Nishihara -
r30422:0e6ce631 default
parent child Browse files
Show More
@@ -1,203 +1,204 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14
14
15 from .i18n import _
15 from .i18n import _
16 from . import (
16 from . import (
17 error,
17 error,
18 util,
18 util,
19 )
19 )
20
20
21 def countcpus():
21 def countcpus():
22 '''try to count the number of CPUs on the system'''
22 '''try to count the number of CPUs on the system'''
23
23
24 # posix
24 # posix
25 try:
25 try:
26 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
26 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
27 if n > 0:
27 if n > 0:
28 return n
28 return n
29 except (AttributeError, ValueError):
29 except (AttributeError, ValueError):
30 pass
30 pass
31
31
32 # windows
32 # windows
33 try:
33 try:
34 n = int(os.environ['NUMBER_OF_PROCESSORS'])
34 n = int(os.environ['NUMBER_OF_PROCESSORS'])
35 if n > 0:
35 if n > 0:
36 return n
36 return n
37 except (KeyError, ValueError):
37 except (KeyError, ValueError):
38 pass
38 pass
39
39
40 return 1
40 return 1
41
41
42 def _numworkers(ui):
42 def _numworkers(ui):
43 s = ui.config('worker', 'numcpus')
43 s = ui.config('worker', 'numcpus')
44 if s:
44 if s:
45 try:
45 try:
46 n = int(s)
46 n = int(s)
47 if n >= 1:
47 if n >= 1:
48 return n
48 return n
49 except ValueError:
49 except ValueError:
50 raise error.Abort(_('number of cpus must be an integer'))
50 raise error.Abort(_('number of cpus must be an integer'))
51 return min(max(countcpus(), 4), 32)
51 return min(max(countcpus(), 4), 32)
52
52
53 if os.name == 'posix':
53 if os.name == 'posix':
54 _startupcost = 0.01
54 _startupcost = 0.01
55 else:
55 else:
56 _startupcost = 1e30
56 _startupcost = 1e30
57
57
58 def worthwhile(ui, costperop, nops):
58 def worthwhile(ui, costperop, nops):
59 '''try to determine whether the benefit of multiple processes can
59 '''try to determine whether the benefit of multiple processes can
60 outweigh the cost of starting them'''
60 outweigh the cost of starting them'''
61 linear = costperop * nops
61 linear = costperop * nops
62 workers = _numworkers(ui)
62 workers = _numworkers(ui)
63 benefit = linear - (_startupcost * workers + linear / workers)
63 benefit = linear - (_startupcost * workers + linear / workers)
64 return benefit >= 0.15
64 return benefit >= 0.15
65
65
66 def worker(ui, costperarg, func, staticargs, args):
66 def worker(ui, costperarg, func, staticargs, args):
67 '''run a function, possibly in parallel in multiple worker
67 '''run a function, possibly in parallel in multiple worker
68 processes.
68 processes.
69
69
70 returns a progress iterator
70 returns a progress iterator
71
71
72 costperarg - cost of a single task
72 costperarg - cost of a single task
73
73
74 func - function to run
74 func - function to run
75
75
76 staticargs - arguments to pass to every invocation of the function
76 staticargs - arguments to pass to every invocation of the function
77
77
78 args - arguments to split into chunks, to pass to individual
78 args - arguments to split into chunks, to pass to individual
79 workers
79 workers
80 '''
80 '''
81 if worthwhile(ui, costperarg, len(args)):
81 if worthwhile(ui, costperarg, len(args)):
82 return _platformworker(ui, func, staticargs, args)
82 return _platformworker(ui, func, staticargs, args)
83 return func(*staticargs + (args,))
83 return func(*staticargs + (args,))
84
84
85 def _posixworker(ui, func, staticargs, args):
85 def _posixworker(ui, func, staticargs, args):
86 rfd, wfd = os.pipe()
86 rfd, wfd = os.pipe()
87 workers = _numworkers(ui)
87 workers = _numworkers(ui)
88 oldhandler = signal.getsignal(signal.SIGINT)
88 oldhandler = signal.getsignal(signal.SIGINT)
89 signal.signal(signal.SIGINT, signal.SIG_IGN)
89 signal.signal(signal.SIGINT, signal.SIG_IGN)
90 pids, problem = set(), [0]
90 pids, problem = set(), [0]
91 def killworkers():
91 def killworkers():
92 # if one worker bails, there's no good reason to wait for the rest
92 # if one worker bails, there's no good reason to wait for the rest
93 for p in pids:
93 for p in pids:
94 try:
94 try:
95 os.kill(p, signal.SIGTERM)
95 os.kill(p, signal.SIGTERM)
96 except OSError as err:
96 except OSError as err:
97 if err.errno != errno.ESRCH:
97 if err.errno != errno.ESRCH:
98 raise
98 raise
99 def waitforworkers(blocking=True):
99 def waitforworkers(blocking=True):
100 for pid in pids.copy():
100 for pid in pids.copy():
101 p = st = 0
101 p = st = 0
102 while True:
102 while True:
103 try:
103 try:
104 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
104 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
105 break
105 except OSError as e:
106 except OSError as e:
106 if e.errno == errno.EINTR:
107 if e.errno == errno.EINTR:
107 continue
108 continue
108 elif e.errno == errno.ECHILD:
109 elif e.errno == errno.ECHILD:
109 break # ignore ECHILD
110 break # ignore ECHILD
110 else:
111 else:
111 raise
112 raise
112 if p:
113 if p:
113 pids.remove(p)
114 pids.remove(p)
114 st = _exitstatus(st)
115 st = _exitstatus(st)
115 if st and not problem[0]:
116 if st and not problem[0]:
116 problem[0] = st
117 problem[0] = st
117 # unregister SIGCHLD handler as all children will be killed
118 # unregister SIGCHLD handler as all children will be killed
118 signal.signal(signal.SIGCHLD, oldchldhandler)
119 signal.signal(signal.SIGCHLD, oldchldhandler)
119 killworkers()
120 killworkers()
120 def sigchldhandler(signum, frame):
121 def sigchldhandler(signum, frame):
121 waitforworkers(blocking=False)
122 waitforworkers(blocking=False)
122 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
123 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
123 for pargs in partition(args, workers):
124 for pargs in partition(args, workers):
124 pid = os.fork()
125 pid = os.fork()
125 if pid == 0:
126 if pid == 0:
126 signal.signal(signal.SIGINT, oldhandler)
127 signal.signal(signal.SIGINT, oldhandler)
127 signal.signal(signal.SIGCHLD, oldchldhandler)
128 signal.signal(signal.SIGCHLD, oldchldhandler)
128 try:
129 try:
129 os.close(rfd)
130 os.close(rfd)
130 for i, item in func(*(staticargs + (pargs,))):
131 for i, item in func(*(staticargs + (pargs,))):
131 os.write(wfd, '%d %s\n' % (i, item))
132 os.write(wfd, '%d %s\n' % (i, item))
132 os._exit(0)
133 os._exit(0)
133 except KeyboardInterrupt:
134 except KeyboardInterrupt:
134 os._exit(255)
135 os._exit(255)
135 # other exceptions are allowed to propagate, we rely
136 # other exceptions are allowed to propagate, we rely
136 # on lock.py's pid checks to avoid release callbacks
137 # on lock.py's pid checks to avoid release callbacks
137 pids.add(pid)
138 pids.add(pid)
138 os.close(wfd)
139 os.close(wfd)
139 fp = os.fdopen(rfd, 'rb', 0)
140 fp = os.fdopen(rfd, 'rb', 0)
140 def cleanup():
141 def cleanup():
141 signal.signal(signal.SIGINT, oldhandler)
142 signal.signal(signal.SIGINT, oldhandler)
142 waitforworkers()
143 waitforworkers()
143 signal.signal(signal.SIGCHLD, oldchldhandler)
144 signal.signal(signal.SIGCHLD, oldchldhandler)
144 status = problem[0]
145 status = problem[0]
145 if status:
146 if status:
146 if status < 0:
147 if status < 0:
147 os.kill(os.getpid(), -status)
148 os.kill(os.getpid(), -status)
148 sys.exit(status)
149 sys.exit(status)
149 try:
150 try:
150 for line in util.iterfile(fp):
151 for line in util.iterfile(fp):
151 l = line.split(' ', 1)
152 l = line.split(' ', 1)
152 yield int(l[0]), l[1][:-1]
153 yield int(l[0]), l[1][:-1]
153 except: # re-raises
154 except: # re-raises
154 killworkers()
155 killworkers()
155 cleanup()
156 cleanup()
156 raise
157 raise
157 cleanup()
158 cleanup()
158
159
159 def _posixexitstatus(code):
160 def _posixexitstatus(code):
160 '''convert a posix exit status into the same form returned by
161 '''convert a posix exit status into the same form returned by
161 os.spawnv
162 os.spawnv
162
163
163 returns None if the process was stopped instead of exiting'''
164 returns None if the process was stopped instead of exiting'''
164 if os.WIFEXITED(code):
165 if os.WIFEXITED(code):
165 return os.WEXITSTATUS(code)
166 return os.WEXITSTATUS(code)
166 elif os.WIFSIGNALED(code):
167 elif os.WIFSIGNALED(code):
167 return -os.WTERMSIG(code)
168 return -os.WTERMSIG(code)
168
169
169 if os.name != 'nt':
170 if os.name != 'nt':
170 _platformworker = _posixworker
171 _platformworker = _posixworker
171 _exitstatus = _posixexitstatus
172 _exitstatus = _posixexitstatus
172
173
173 def partition(lst, nslices):
174 def partition(lst, nslices):
174 '''partition a list into N slices of roughly equal size
175 '''partition a list into N slices of roughly equal size
175
176
176 The current strategy takes every Nth element from the input. If
177 The current strategy takes every Nth element from the input. If
177 we ever write workers that need to preserve grouping in input
178 we ever write workers that need to preserve grouping in input
178 we should consider allowing callers to specify a partition strategy.
179 we should consider allowing callers to specify a partition strategy.
179
180
180 mpm is not a fan of this partitioning strategy when files are involved.
181 mpm is not a fan of this partitioning strategy when files are involved.
181 In his words:
182 In his words:
182
183
183 Single-threaded Mercurial makes a point of creating and visiting
184 Single-threaded Mercurial makes a point of creating and visiting
184 files in a fixed order (alphabetical). When creating files in order,
185 files in a fixed order (alphabetical). When creating files in order,
185 a typical filesystem is likely to allocate them on nearby regions on
186 a typical filesystem is likely to allocate them on nearby regions on
186 disk. Thus, when revisiting in the same order, locality is maximized
187 disk. Thus, when revisiting in the same order, locality is maximized
187 and various forms of OS and disk-level caching and read-ahead get a
188 and various forms of OS and disk-level caching and read-ahead get a
188 chance to work.
189 chance to work.
189
190
190 This effect can be quite significant on spinning disks. I discovered it
191 This effect can be quite significant on spinning disks. I discovered it
191 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
192 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
192 Tarring a repo and copying it to another disk effectively randomized
193 Tarring a repo and copying it to another disk effectively randomized
193 the revlog ordering on disk by sorting the revlogs by hash and suddenly
194 the revlog ordering on disk by sorting the revlogs by hash and suddenly
194 performance of my kernel checkout benchmark dropped by ~10x because the
195 performance of my kernel checkout benchmark dropped by ~10x because the
195 "working set" of sectors visited no longer fit in the drive's cache and
196 "working set" of sectors visited no longer fit in the drive's cache and
196 the workload switched from streaming to random I/O.
197 the workload switched from streaming to random I/O.
197
198
198 What we should really be doing is have workers read filenames from a
199 What we should really be doing is have workers read filenames from a
199 ordered queue. This preserves locality and also keeps any worker from
200 ordered queue. This preserves locality and also keeps any worker from
200 getting more than one file out of balance.
201 getting more than one file out of balance.
201 '''
202 '''
202 for i in range(nslices):
203 for i in range(nslices):
203 yield lst[i::nslices]
204 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now