##// END OF EJS Templates
worker: kill workers after all zombie processes are reaped...
Yuya Nishihara -
r30424:f2d13eb8 default
parent child Browse files
Show More
@@ -1,206 +1,207 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14
15 15 from .i18n import _
16 16 from . import (
17 17 error,
18 18 util,
19 19 )
20 20
21 21 def countcpus():
22 22 '''try to count the number of CPUs on the system'''
23 23
24 24 # posix
25 25 try:
26 26 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
27 27 if n > 0:
28 28 return n
29 29 except (AttributeError, ValueError):
30 30 pass
31 31
32 32 # windows
33 33 try:
34 34 n = int(os.environ['NUMBER_OF_PROCESSORS'])
35 35 if n > 0:
36 36 return n
37 37 except (KeyError, ValueError):
38 38 pass
39 39
40 40 return 1
41 41
42 42 def _numworkers(ui):
43 43 s = ui.config('worker', 'numcpus')
44 44 if s:
45 45 try:
46 46 n = int(s)
47 47 if n >= 1:
48 48 return n
49 49 except ValueError:
50 50 raise error.Abort(_('number of cpus must be an integer'))
51 51 return min(max(countcpus(), 4), 32)
52 52
53 53 if os.name == 'posix':
54 54 _startupcost = 0.01
55 55 else:
56 56 _startupcost = 1e30
57 57
58 58 def worthwhile(ui, costperop, nops):
59 59 '''try to determine whether the benefit of multiple processes can
60 60 outweigh the cost of starting them'''
61 61 linear = costperop * nops
62 62 workers = _numworkers(ui)
63 63 benefit = linear - (_startupcost * workers + linear / workers)
64 64 return benefit >= 0.15
65 65
66 66 def worker(ui, costperarg, func, staticargs, args):
67 67 '''run a function, possibly in parallel in multiple worker
68 68 processes.
69 69
70 70 returns a progress iterator
71 71
72 72 costperarg - cost of a single task
73 73
74 74 func - function to run
75 75
76 76 staticargs - arguments to pass to every invocation of the function
77 77
78 78 args - arguments to split into chunks, to pass to individual
79 79 workers
80 80 '''
81 81 if worthwhile(ui, costperarg, len(args)):
82 82 return _platformworker(ui, func, staticargs, args)
83 83 return func(*staticargs + (args,))
84 84
85 85 def _posixworker(ui, func, staticargs, args):
86 86 rfd, wfd = os.pipe()
87 87 workers = _numworkers(ui)
88 88 oldhandler = signal.getsignal(signal.SIGINT)
89 89 signal.signal(signal.SIGINT, signal.SIG_IGN)
90 90 pids, problem = set(), [0]
91 91 def killworkers():
92 92 # unregister SIGCHLD handler as all children will be killed. This
93 93 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
94 94 # could be updated while iterating, which would cause inconsistency.
95 95 signal.signal(signal.SIGCHLD, oldchldhandler)
96 96 # if one worker bails, there's no good reason to wait for the rest
97 97 for p in pids:
98 98 try:
99 99 os.kill(p, signal.SIGTERM)
100 100 except OSError as err:
101 101 if err.errno != errno.ESRCH:
102 102 raise
103 103 def waitforworkers(blocking=True):
104 104 for pid in pids.copy():
105 105 p = st = 0
106 106 while True:
107 107 try:
108 108 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
109 109 break
110 110 except OSError as e:
111 111 if e.errno == errno.EINTR:
112 112 continue
113 113 elif e.errno == errno.ECHILD:
114 114 break # ignore ECHILD
115 115 else:
116 116 raise
117 117 if p:
118 118 pids.remove(p)
119 119 st = _exitstatus(st)
120 120 if st and not problem[0]:
121 121 problem[0] = st
122 killworkers()
123 122 def sigchldhandler(signum, frame):
124 123 waitforworkers(blocking=False)
124 if problem[0]:
125 killworkers()
125 126 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
126 127 for pargs in partition(args, workers):
127 128 pid = os.fork()
128 129 if pid == 0:
129 130 signal.signal(signal.SIGINT, oldhandler)
130 131 signal.signal(signal.SIGCHLD, oldchldhandler)
131 132 try:
132 133 os.close(rfd)
133 134 for i, item in func(*(staticargs + (pargs,))):
134 135 os.write(wfd, '%d %s\n' % (i, item))
135 136 os._exit(0)
136 137 except KeyboardInterrupt:
137 138 os._exit(255)
138 139 # other exceptions are allowed to propagate, we rely
139 140 # on lock.py's pid checks to avoid release callbacks
140 141 pids.add(pid)
141 142 os.close(wfd)
142 143 fp = os.fdopen(rfd, 'rb', 0)
143 144 def cleanup():
144 145 signal.signal(signal.SIGINT, oldhandler)
145 146 waitforworkers()
146 147 signal.signal(signal.SIGCHLD, oldchldhandler)
147 148 status = problem[0]
148 149 if status:
149 150 if status < 0:
150 151 os.kill(os.getpid(), -status)
151 152 sys.exit(status)
152 153 try:
153 154 for line in util.iterfile(fp):
154 155 l = line.split(' ', 1)
155 156 yield int(l[0]), l[1][:-1]
156 157 except: # re-raises
157 158 killworkers()
158 159 cleanup()
159 160 raise
160 161 cleanup()
161 162
162 163 def _posixexitstatus(code):
163 164 '''convert a posix exit status into the same form returned by
164 165 os.spawnv
165 166
166 167 returns None if the process was stopped instead of exiting'''
167 168 if os.WIFEXITED(code):
168 169 return os.WEXITSTATUS(code)
169 170 elif os.WIFSIGNALED(code):
170 171 return -os.WTERMSIG(code)
171 172
172 173 if os.name != 'nt':
173 174 _platformworker = _posixworker
174 175 _exitstatus = _posixexitstatus
175 176
176 177 def partition(lst, nslices):
177 178 '''partition a list into N slices of roughly equal size
178 179
179 180 The current strategy takes every Nth element from the input. If
180 181 we ever write workers that need to preserve grouping in input
181 182 we should consider allowing callers to specify a partition strategy.
182 183
183 184 mpm is not a fan of this partitioning strategy when files are involved.
184 185 In his words:
185 186
186 187 Single-threaded Mercurial makes a point of creating and visiting
187 188 files in a fixed order (alphabetical). When creating files in order,
188 189 a typical filesystem is likely to allocate them on nearby regions on
189 190 disk. Thus, when revisiting in the same order, locality is maximized
190 191 and various forms of OS and disk-level caching and read-ahead get a
191 192 chance to work.
192 193
193 194 This effect can be quite significant on spinning disks. I discovered it
194 195 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
195 196 Tarring a repo and copying it to another disk effectively randomized
196 197 the revlog ordering on disk by sorting the revlogs by hash and suddenly
197 198 performance of my kernel checkout benchmark dropped by ~10x because the
198 199 "working set" of sectors visited no longer fit in the drive's cache and
199 200 the workload switched from streaming to random I/O.
200 201
201 202 What we should really be doing is have workers read filenames from a
202 203 ordered queue. This preserves locality and also keeps any worker from
203 204 getting more than one file out of balance.
204 205 '''
205 206 for i in range(nslices):
206 207 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now