##// END OF EJS Templates
worker: make sure killworkers() never be interrupted by another SIGCHLD...
Yuya Nishihara -
r30423:237b2883 default
parent child Browse files
Show More
@@ -1,204 +1,206
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14
15 15 from .i18n import _
16 16 from . import (
17 17 error,
18 18 util,
19 19 )
20 20
21 21 def countcpus():
22 22 '''try to count the number of CPUs on the system'''
23 23
24 24 # posix
25 25 try:
26 26 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
27 27 if n > 0:
28 28 return n
29 29 except (AttributeError, ValueError):
30 30 pass
31 31
32 32 # windows
33 33 try:
34 34 n = int(os.environ['NUMBER_OF_PROCESSORS'])
35 35 if n > 0:
36 36 return n
37 37 except (KeyError, ValueError):
38 38 pass
39 39
40 40 return 1
41 41
42 42 def _numworkers(ui):
43 43 s = ui.config('worker', 'numcpus')
44 44 if s:
45 45 try:
46 46 n = int(s)
47 47 if n >= 1:
48 48 return n
49 49 except ValueError:
50 50 raise error.Abort(_('number of cpus must be an integer'))
51 51 return min(max(countcpus(), 4), 32)
52 52
53 53 if os.name == 'posix':
54 54 _startupcost = 0.01
55 55 else:
56 56 _startupcost = 1e30
57 57
58 58 def worthwhile(ui, costperop, nops):
59 59 '''try to determine whether the benefit of multiple processes can
60 60 outweigh the cost of starting them'''
61 61 linear = costperop * nops
62 62 workers = _numworkers(ui)
63 63 benefit = linear - (_startupcost * workers + linear / workers)
64 64 return benefit >= 0.15
65 65
66 66 def worker(ui, costperarg, func, staticargs, args):
67 67 '''run a function, possibly in parallel in multiple worker
68 68 processes.
69 69
70 70 returns a progress iterator
71 71
72 72 costperarg - cost of a single task
73 73
74 74 func - function to run
75 75
76 76 staticargs - arguments to pass to every invocation of the function
77 77
78 78 args - arguments to split into chunks, to pass to individual
79 79 workers
80 80 '''
81 81 if worthwhile(ui, costperarg, len(args)):
82 82 return _platformworker(ui, func, staticargs, args)
83 83 return func(*staticargs + (args,))
84 84
85 85 def _posixworker(ui, func, staticargs, args):
86 86 rfd, wfd = os.pipe()
87 87 workers = _numworkers(ui)
88 88 oldhandler = signal.getsignal(signal.SIGINT)
89 89 signal.signal(signal.SIGINT, signal.SIG_IGN)
90 90 pids, problem = set(), [0]
91 91 def killworkers():
92 # unregister SIGCHLD handler as all children will be killed. This
93 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
94 # could be updated while iterating, which would cause inconsistency.
95 signal.signal(signal.SIGCHLD, oldchldhandler)
92 96 # if one worker bails, there's no good reason to wait for the rest
93 97 for p in pids:
94 98 try:
95 99 os.kill(p, signal.SIGTERM)
96 100 except OSError as err:
97 101 if err.errno != errno.ESRCH:
98 102 raise
99 103 def waitforworkers(blocking=True):
100 104 for pid in pids.copy():
101 105 p = st = 0
102 106 while True:
103 107 try:
104 108 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
105 109 break
106 110 except OSError as e:
107 111 if e.errno == errno.EINTR:
108 112 continue
109 113 elif e.errno == errno.ECHILD:
110 114 break # ignore ECHILD
111 115 else:
112 116 raise
113 117 if p:
114 118 pids.remove(p)
115 119 st = _exitstatus(st)
116 120 if st and not problem[0]:
117 121 problem[0] = st
118 # unregister SIGCHLD handler as all children will be killed
119 signal.signal(signal.SIGCHLD, oldchldhandler)
120 122 killworkers()
121 123 def sigchldhandler(signum, frame):
122 124 waitforworkers(blocking=False)
123 125 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
124 126 for pargs in partition(args, workers):
125 127 pid = os.fork()
126 128 if pid == 0:
127 129 signal.signal(signal.SIGINT, oldhandler)
128 130 signal.signal(signal.SIGCHLD, oldchldhandler)
129 131 try:
130 132 os.close(rfd)
131 133 for i, item in func(*(staticargs + (pargs,))):
132 134 os.write(wfd, '%d %s\n' % (i, item))
133 135 os._exit(0)
134 136 except KeyboardInterrupt:
135 137 os._exit(255)
136 138 # other exceptions are allowed to propagate, we rely
137 139 # on lock.py's pid checks to avoid release callbacks
138 140 pids.add(pid)
139 141 os.close(wfd)
140 142 fp = os.fdopen(rfd, 'rb', 0)
141 143 def cleanup():
142 144 signal.signal(signal.SIGINT, oldhandler)
143 145 waitforworkers()
144 146 signal.signal(signal.SIGCHLD, oldchldhandler)
145 147 status = problem[0]
146 148 if status:
147 149 if status < 0:
148 150 os.kill(os.getpid(), -status)
149 151 sys.exit(status)
150 152 try:
151 153 for line in util.iterfile(fp):
152 154 l = line.split(' ', 1)
153 155 yield int(l[0]), l[1][:-1]
154 156 except: # re-raises
155 157 killworkers()
156 158 cleanup()
157 159 raise
158 160 cleanup()
159 161
160 162 def _posixexitstatus(code):
161 163 '''convert a posix exit status into the same form returned by
162 164 os.spawnv
163 165
164 166 returns None if the process was stopped instead of exiting'''
165 167 if os.WIFEXITED(code):
166 168 return os.WEXITSTATUS(code)
167 169 elif os.WIFSIGNALED(code):
168 170 return -os.WTERMSIG(code)
169 171
170 172 if os.name != 'nt':
171 173 _platformworker = _posixworker
172 174 _exitstatus = _posixexitstatus
173 175
174 176 def partition(lst, nslices):
175 177 '''partition a list into N slices of roughly equal size
176 178
177 179 The current strategy takes every Nth element from the input. If
178 180 we ever write workers that need to preserve grouping in input
179 181 we should consider allowing callers to specify a partition strategy.
180 182
181 183 mpm is not a fan of this partitioning strategy when files are involved.
182 184 In his words:
183 185
184 186 Single-threaded Mercurial makes a point of creating and visiting
185 187 files in a fixed order (alphabetical). When creating files in order,
186 188 a typical filesystem is likely to allocate them on nearby regions on
187 189 disk. Thus, when revisiting in the same order, locality is maximized
188 190 and various forms of OS and disk-level caching and read-ahead get a
189 191 chance to work.
190 192
191 193 This effect can be quite significant on spinning disks. I discovered it
192 194 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
193 195 Tarring a repo and copying it to another disk effectively randomized
194 196 the revlog ordering on disk by sorting the revlogs by hash and suddenly
195 197 performance of my kernel checkout benchmark dropped by ~10x because the
196 198 "working set" of sectors visited no longer fit in the drive's cache and
197 199 the workload switched from streaming to random I/O.
198 200
199 201 What we should really be doing is have workers read filenames from a
200 202 ordered queue. This preserves locality and also keeps any worker from
201 203 getting more than one file out of balance.
202 204 '''
203 205 for i in range(nslices):
204 206 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now