##// END OF EJS Templates
worker: discard waited pid by anyone who noticed it first...
Yuya Nishihara -
r30425:03f7aa2b default
parent child Browse files
Show More
@@ -1,207 +1,210 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14
14
15 from .i18n import _
15 from .i18n import _
16 from . import (
16 from . import (
17 error,
17 error,
18 util,
18 util,
19 )
19 )
20
20
21 def countcpus():
21 def countcpus():
22 '''try to count the number of CPUs on the system'''
22 '''try to count the number of CPUs on the system'''
23
23
24 # posix
24 # posix
25 try:
25 try:
26 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
26 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
27 if n > 0:
27 if n > 0:
28 return n
28 return n
29 except (AttributeError, ValueError):
29 except (AttributeError, ValueError):
30 pass
30 pass
31
31
32 # windows
32 # windows
33 try:
33 try:
34 n = int(os.environ['NUMBER_OF_PROCESSORS'])
34 n = int(os.environ['NUMBER_OF_PROCESSORS'])
35 if n > 0:
35 if n > 0:
36 return n
36 return n
37 except (KeyError, ValueError):
37 except (KeyError, ValueError):
38 pass
38 pass
39
39
40 return 1
40 return 1
41
41
42 def _numworkers(ui):
42 def _numworkers(ui):
43 s = ui.config('worker', 'numcpus')
43 s = ui.config('worker', 'numcpus')
44 if s:
44 if s:
45 try:
45 try:
46 n = int(s)
46 n = int(s)
47 if n >= 1:
47 if n >= 1:
48 return n
48 return n
49 except ValueError:
49 except ValueError:
50 raise error.Abort(_('number of cpus must be an integer'))
50 raise error.Abort(_('number of cpus must be an integer'))
51 return min(max(countcpus(), 4), 32)
51 return min(max(countcpus(), 4), 32)
52
52
53 if os.name == 'posix':
53 if os.name == 'posix':
54 _startupcost = 0.01
54 _startupcost = 0.01
55 else:
55 else:
56 _startupcost = 1e30
56 _startupcost = 1e30
57
57
58 def worthwhile(ui, costperop, nops):
58 def worthwhile(ui, costperop, nops):
59 '''try to determine whether the benefit of multiple processes can
59 '''try to determine whether the benefit of multiple processes can
60 outweigh the cost of starting them'''
60 outweigh the cost of starting them'''
61 linear = costperop * nops
61 linear = costperop * nops
62 workers = _numworkers(ui)
62 workers = _numworkers(ui)
63 benefit = linear - (_startupcost * workers + linear / workers)
63 benefit = linear - (_startupcost * workers + linear / workers)
64 return benefit >= 0.15
64 return benefit >= 0.15
65
65
66 def worker(ui, costperarg, func, staticargs, args):
66 def worker(ui, costperarg, func, staticargs, args):
67 '''run a function, possibly in parallel in multiple worker
67 '''run a function, possibly in parallel in multiple worker
68 processes.
68 processes.
69
69
70 returns a progress iterator
70 returns a progress iterator
71
71
72 costperarg - cost of a single task
72 costperarg - cost of a single task
73
73
74 func - function to run
74 func - function to run
75
75
76 staticargs - arguments to pass to every invocation of the function
76 staticargs - arguments to pass to every invocation of the function
77
77
78 args - arguments to split into chunks, to pass to individual
78 args - arguments to split into chunks, to pass to individual
79 workers
79 workers
80 '''
80 '''
81 if worthwhile(ui, costperarg, len(args)):
81 if worthwhile(ui, costperarg, len(args)):
82 return _platformworker(ui, func, staticargs, args)
82 return _platformworker(ui, func, staticargs, args)
83 return func(*staticargs + (args,))
83 return func(*staticargs + (args,))
84
84
85 def _posixworker(ui, func, staticargs, args):
85 def _posixworker(ui, func, staticargs, args):
86 rfd, wfd = os.pipe()
86 rfd, wfd = os.pipe()
87 workers = _numworkers(ui)
87 workers = _numworkers(ui)
88 oldhandler = signal.getsignal(signal.SIGINT)
88 oldhandler = signal.getsignal(signal.SIGINT)
89 signal.signal(signal.SIGINT, signal.SIG_IGN)
89 signal.signal(signal.SIGINT, signal.SIG_IGN)
90 pids, problem = set(), [0]
90 pids, problem = set(), [0]
91 def killworkers():
91 def killworkers():
92 # unregister SIGCHLD handler as all children will be killed. This
92 # unregister SIGCHLD handler as all children will be killed. This
93 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
93 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
94 # could be updated while iterating, which would cause inconsistency.
94 # could be updated while iterating, which would cause inconsistency.
95 signal.signal(signal.SIGCHLD, oldchldhandler)
95 signal.signal(signal.SIGCHLD, oldchldhandler)
96 # if one worker bails, there's no good reason to wait for the rest
96 # if one worker bails, there's no good reason to wait for the rest
97 for p in pids:
97 for p in pids:
98 try:
98 try:
99 os.kill(p, signal.SIGTERM)
99 os.kill(p, signal.SIGTERM)
100 except OSError as err:
100 except OSError as err:
101 if err.errno != errno.ESRCH:
101 if err.errno != errno.ESRCH:
102 raise
102 raise
103 def waitforworkers(blocking=True):
103 def waitforworkers(blocking=True):
104 for pid in pids.copy():
104 for pid in pids.copy():
105 p = st = 0
105 p = st = 0
106 while True:
106 while True:
107 try:
107 try:
108 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
108 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
109 break
109 break
110 except OSError as e:
110 except OSError as e:
111 if e.errno == errno.EINTR:
111 if e.errno == errno.EINTR:
112 continue
112 continue
113 elif e.errno == errno.ECHILD:
113 elif e.errno == errno.ECHILD:
114 break # ignore ECHILD
114 # child would already be reaped, but pids yet been
115 # updated (maybe interrupted just after waitpid)
116 pids.discard(pid)
117 break
115 else:
118 else:
116 raise
119 raise
117 if p:
120 if p:
118 pids.remove(p)
121 pids.discard(p)
119 st = _exitstatus(st)
122 st = _exitstatus(st)
120 if st and not problem[0]:
123 if st and not problem[0]:
121 problem[0] = st
124 problem[0] = st
122 def sigchldhandler(signum, frame):
125 def sigchldhandler(signum, frame):
123 waitforworkers(blocking=False)
126 waitforworkers(blocking=False)
124 if problem[0]:
127 if problem[0]:
125 killworkers()
128 killworkers()
126 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
129 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
127 for pargs in partition(args, workers):
130 for pargs in partition(args, workers):
128 pid = os.fork()
131 pid = os.fork()
129 if pid == 0:
132 if pid == 0:
130 signal.signal(signal.SIGINT, oldhandler)
133 signal.signal(signal.SIGINT, oldhandler)
131 signal.signal(signal.SIGCHLD, oldchldhandler)
134 signal.signal(signal.SIGCHLD, oldchldhandler)
132 try:
135 try:
133 os.close(rfd)
136 os.close(rfd)
134 for i, item in func(*(staticargs + (pargs,))):
137 for i, item in func(*(staticargs + (pargs,))):
135 os.write(wfd, '%d %s\n' % (i, item))
138 os.write(wfd, '%d %s\n' % (i, item))
136 os._exit(0)
139 os._exit(0)
137 except KeyboardInterrupt:
140 except KeyboardInterrupt:
138 os._exit(255)
141 os._exit(255)
139 # other exceptions are allowed to propagate, we rely
142 # other exceptions are allowed to propagate, we rely
140 # on lock.py's pid checks to avoid release callbacks
143 # on lock.py's pid checks to avoid release callbacks
141 pids.add(pid)
144 pids.add(pid)
142 os.close(wfd)
145 os.close(wfd)
143 fp = os.fdopen(rfd, 'rb', 0)
146 fp = os.fdopen(rfd, 'rb', 0)
144 def cleanup():
147 def cleanup():
145 signal.signal(signal.SIGINT, oldhandler)
148 signal.signal(signal.SIGINT, oldhandler)
146 waitforworkers()
149 waitforworkers()
147 signal.signal(signal.SIGCHLD, oldchldhandler)
150 signal.signal(signal.SIGCHLD, oldchldhandler)
148 status = problem[0]
151 status = problem[0]
149 if status:
152 if status:
150 if status < 0:
153 if status < 0:
151 os.kill(os.getpid(), -status)
154 os.kill(os.getpid(), -status)
152 sys.exit(status)
155 sys.exit(status)
153 try:
156 try:
154 for line in util.iterfile(fp):
157 for line in util.iterfile(fp):
155 l = line.split(' ', 1)
158 l = line.split(' ', 1)
156 yield int(l[0]), l[1][:-1]
159 yield int(l[0]), l[1][:-1]
157 except: # re-raises
160 except: # re-raises
158 killworkers()
161 killworkers()
159 cleanup()
162 cleanup()
160 raise
163 raise
161 cleanup()
164 cleanup()
162
165
163 def _posixexitstatus(code):
166 def _posixexitstatus(code):
164 '''convert a posix exit status into the same form returned by
167 '''convert a posix exit status into the same form returned by
165 os.spawnv
168 os.spawnv
166
169
167 returns None if the process was stopped instead of exiting'''
170 returns None if the process was stopped instead of exiting'''
168 if os.WIFEXITED(code):
171 if os.WIFEXITED(code):
169 return os.WEXITSTATUS(code)
172 return os.WEXITSTATUS(code)
170 elif os.WIFSIGNALED(code):
173 elif os.WIFSIGNALED(code):
171 return -os.WTERMSIG(code)
174 return -os.WTERMSIG(code)
172
175
173 if os.name != 'nt':
176 if os.name != 'nt':
174 _platformworker = _posixworker
177 _platformworker = _posixworker
175 _exitstatus = _posixexitstatus
178 _exitstatus = _posixexitstatus
176
179
177 def partition(lst, nslices):
180 def partition(lst, nslices):
178 '''partition a list into N slices of roughly equal size
181 '''partition a list into N slices of roughly equal size
179
182
180 The current strategy takes every Nth element from the input. If
183 The current strategy takes every Nth element from the input. If
181 we ever write workers that need to preserve grouping in input
184 we ever write workers that need to preserve grouping in input
182 we should consider allowing callers to specify a partition strategy.
185 we should consider allowing callers to specify a partition strategy.
183
186
184 mpm is not a fan of this partitioning strategy when files are involved.
187 mpm is not a fan of this partitioning strategy when files are involved.
185 In his words:
188 In his words:
186
189
187 Single-threaded Mercurial makes a point of creating and visiting
190 Single-threaded Mercurial makes a point of creating and visiting
188 files in a fixed order (alphabetical). When creating files in order,
191 files in a fixed order (alphabetical). When creating files in order,
189 a typical filesystem is likely to allocate them on nearby regions on
192 a typical filesystem is likely to allocate them on nearby regions on
190 disk. Thus, when revisiting in the same order, locality is maximized
193 disk. Thus, when revisiting in the same order, locality is maximized
191 and various forms of OS and disk-level caching and read-ahead get a
194 and various forms of OS and disk-level caching and read-ahead get a
192 chance to work.
195 chance to work.
193
196
194 This effect can be quite significant on spinning disks. I discovered it
197 This effect can be quite significant on spinning disks. I discovered it
195 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
198 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
196 Tarring a repo and copying it to another disk effectively randomized
199 Tarring a repo and copying it to another disk effectively randomized
197 the revlog ordering on disk by sorting the revlogs by hash and suddenly
200 the revlog ordering on disk by sorting the revlogs by hash and suddenly
198 performance of my kernel checkout benchmark dropped by ~10x because the
201 performance of my kernel checkout benchmark dropped by ~10x because the
199 "working set" of sectors visited no longer fit in the drive's cache and
202 "working set" of sectors visited no longer fit in the drive's cache and
200 the workload switched from streaming to random I/O.
203 the workload switched from streaming to random I/O.
201
204
202 What we should really be doing is have workers read filenames from a
205 What we should really be doing is have workers read filenames from a
203 ordered queue. This preserves locality and also keeps any worker from
206 ordered queue. This preserves locality and also keeps any worker from
204 getting more than one file out of balance.
207 getting more than one file out of balance.
205 '''
208 '''
206 for i in range(nslices):
209 for i in range(nslices):
207 yield lst[i::nslices]
210 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now