##// END OF EJS Templates
worker: fix missed break on successful waitpid()...
Yuya Nishihara -
r30422:0e6ce631 default
parent child Browse files
Show More
@@ -1,203 +1,204 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14
15 15 from .i18n import _
16 16 from . import (
17 17 error,
18 18 util,
19 19 )
20 20
21 21 def countcpus():
22 22 '''try to count the number of CPUs on the system'''
23 23
24 24 # posix
25 25 try:
26 26 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
27 27 if n > 0:
28 28 return n
29 29 except (AttributeError, ValueError):
30 30 pass
31 31
32 32 # windows
33 33 try:
34 34 n = int(os.environ['NUMBER_OF_PROCESSORS'])
35 35 if n > 0:
36 36 return n
37 37 except (KeyError, ValueError):
38 38 pass
39 39
40 40 return 1
41 41
42 42 def _numworkers(ui):
43 43 s = ui.config('worker', 'numcpus')
44 44 if s:
45 45 try:
46 46 n = int(s)
47 47 if n >= 1:
48 48 return n
49 49 except ValueError:
50 50 raise error.Abort(_('number of cpus must be an integer'))
51 51 return min(max(countcpus(), 4), 32)
52 52
53 53 if os.name == 'posix':
54 54 _startupcost = 0.01
55 55 else:
56 56 _startupcost = 1e30
57 57
58 58 def worthwhile(ui, costperop, nops):
59 59 '''try to determine whether the benefit of multiple processes can
60 60 outweigh the cost of starting them'''
61 61 linear = costperop * nops
62 62 workers = _numworkers(ui)
63 63 benefit = linear - (_startupcost * workers + linear / workers)
64 64 return benefit >= 0.15
65 65
66 66 def worker(ui, costperarg, func, staticargs, args):
67 67 '''run a function, possibly in parallel in multiple worker
68 68 processes.
69 69
70 70 returns a progress iterator
71 71
72 72 costperarg - cost of a single task
73 73
74 74 func - function to run
75 75
76 76 staticargs - arguments to pass to every invocation of the function
77 77
78 78 args - arguments to split into chunks, to pass to individual
79 79 workers
80 80 '''
81 81 if worthwhile(ui, costperarg, len(args)):
82 82 return _platformworker(ui, func, staticargs, args)
83 83 return func(*staticargs + (args,))
84 84
85 85 def _posixworker(ui, func, staticargs, args):
86 86 rfd, wfd = os.pipe()
87 87 workers = _numworkers(ui)
88 88 oldhandler = signal.getsignal(signal.SIGINT)
89 89 signal.signal(signal.SIGINT, signal.SIG_IGN)
90 90 pids, problem = set(), [0]
91 91 def killworkers():
92 92 # if one worker bails, there's no good reason to wait for the rest
93 93 for p in pids:
94 94 try:
95 95 os.kill(p, signal.SIGTERM)
96 96 except OSError as err:
97 97 if err.errno != errno.ESRCH:
98 98 raise
99 99 def waitforworkers(blocking=True):
100 100 for pid in pids.copy():
101 101 p = st = 0
102 102 while True:
103 103 try:
104 104 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
105 break
105 106 except OSError as e:
106 107 if e.errno == errno.EINTR:
107 108 continue
108 109 elif e.errno == errno.ECHILD:
109 110 break # ignore ECHILD
110 111 else:
111 112 raise
112 113 if p:
113 114 pids.remove(p)
114 115 st = _exitstatus(st)
115 116 if st and not problem[0]:
116 117 problem[0] = st
117 118 # unregister SIGCHLD handler as all children will be killed
118 119 signal.signal(signal.SIGCHLD, oldchldhandler)
119 120 killworkers()
120 121 def sigchldhandler(signum, frame):
121 122 waitforworkers(blocking=False)
122 123 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
123 124 for pargs in partition(args, workers):
124 125 pid = os.fork()
125 126 if pid == 0:
126 127 signal.signal(signal.SIGINT, oldhandler)
127 128 signal.signal(signal.SIGCHLD, oldchldhandler)
128 129 try:
129 130 os.close(rfd)
130 131 for i, item in func(*(staticargs + (pargs,))):
131 132 os.write(wfd, '%d %s\n' % (i, item))
132 133 os._exit(0)
133 134 except KeyboardInterrupt:
134 135 os._exit(255)
135 136 # other exceptions are allowed to propagate, we rely
136 137 # on lock.py's pid checks to avoid release callbacks
137 138 pids.add(pid)
138 139 os.close(wfd)
139 140 fp = os.fdopen(rfd, 'rb', 0)
140 141 def cleanup():
141 142 signal.signal(signal.SIGINT, oldhandler)
142 143 waitforworkers()
143 144 signal.signal(signal.SIGCHLD, oldchldhandler)
144 145 status = problem[0]
145 146 if status:
146 147 if status < 0:
147 148 os.kill(os.getpid(), -status)
148 149 sys.exit(status)
149 150 try:
150 151 for line in util.iterfile(fp):
151 152 l = line.split(' ', 1)
152 153 yield int(l[0]), l[1][:-1]
153 154 except: # re-raises
154 155 killworkers()
155 156 cleanup()
156 157 raise
157 158 cleanup()
158 159
159 160 def _posixexitstatus(code):
160 161 '''convert a posix exit status into the same form returned by
161 162 os.spawnv
162 163
163 164 returns None if the process was stopped instead of exiting'''
164 165 if os.WIFEXITED(code):
165 166 return os.WEXITSTATUS(code)
166 167 elif os.WIFSIGNALED(code):
167 168 return -os.WTERMSIG(code)
168 169
169 170 if os.name != 'nt':
170 171 _platformworker = _posixworker
171 172 _exitstatus = _posixexitstatus
172 173
173 174 def partition(lst, nslices):
174 175 '''partition a list into N slices of roughly equal size
175 176
176 177 The current strategy takes every Nth element from the input. If
177 178 we ever write workers that need to preserve grouping in input
178 179 we should consider allowing callers to specify a partition strategy.
179 180
180 181 mpm is not a fan of this partitioning strategy when files are involved.
181 182 In his words:
182 183
183 184 Single-threaded Mercurial makes a point of creating and visiting
184 185 files in a fixed order (alphabetical). When creating files in order,
185 186 a typical filesystem is likely to allocate them on nearby regions on
186 187 disk. Thus, when revisiting in the same order, locality is maximized
187 188 and various forms of OS and disk-level caching and read-ahead get a
188 189 chance to work.
189 190
190 191 This effect can be quite significant on spinning disks. I discovered it
191 192 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
192 193 Tarring a repo and copying it to another disk effectively randomized
193 194 the revlog ordering on disk by sorting the revlogs by hash and suddenly
194 195 performance of my kernel checkout benchmark dropped by ~10x because the
195 196 "working set" of sectors visited no longer fit in the drive's cache and
196 197 the workload switched from streaming to random I/O.
197 198
198 199 What we should really be doing is have workers read filenames from a
199 200 ordered queue. This preserves locality and also keeps any worker from
200 201 getting more than one file out of balance.
201 202 '''
202 203 for i in range(nslices):
203 204 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now