##// END OF EJS Templates
worker: discard waited pid by anyone who noticed it first...
Yuya Nishihara -
r30425:03f7aa2b default
parent child Browse files
Show More
@@ -1,207 +1,210 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14
15 15 from .i18n import _
16 16 from . import (
17 17 error,
18 18 util,
19 19 )
20 20
21 21 def countcpus():
22 22 '''try to count the number of CPUs on the system'''
23 23
24 24 # posix
25 25 try:
26 26 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
27 27 if n > 0:
28 28 return n
29 29 except (AttributeError, ValueError):
30 30 pass
31 31
32 32 # windows
33 33 try:
34 34 n = int(os.environ['NUMBER_OF_PROCESSORS'])
35 35 if n > 0:
36 36 return n
37 37 except (KeyError, ValueError):
38 38 pass
39 39
40 40 return 1
41 41
42 42 def _numworkers(ui):
43 43 s = ui.config('worker', 'numcpus')
44 44 if s:
45 45 try:
46 46 n = int(s)
47 47 if n >= 1:
48 48 return n
49 49 except ValueError:
50 50 raise error.Abort(_('number of cpus must be an integer'))
51 51 return min(max(countcpus(), 4), 32)
52 52
53 53 if os.name == 'posix':
54 54 _startupcost = 0.01
55 55 else:
56 56 _startupcost = 1e30
57 57
58 58 def worthwhile(ui, costperop, nops):
59 59 '''try to determine whether the benefit of multiple processes can
60 60 outweigh the cost of starting them'''
61 61 linear = costperop * nops
62 62 workers = _numworkers(ui)
63 63 benefit = linear - (_startupcost * workers + linear / workers)
64 64 return benefit >= 0.15
65 65
66 66 def worker(ui, costperarg, func, staticargs, args):
67 67 '''run a function, possibly in parallel in multiple worker
68 68 processes.
69 69
70 70 returns a progress iterator
71 71
72 72 costperarg - cost of a single task
73 73
74 74 func - function to run
75 75
76 76 staticargs - arguments to pass to every invocation of the function
77 77
78 78 args - arguments to split into chunks, to pass to individual
79 79 workers
80 80 '''
81 81 if worthwhile(ui, costperarg, len(args)):
82 82 return _platformworker(ui, func, staticargs, args)
83 83 return func(*staticargs + (args,))
84 84
85 85 def _posixworker(ui, func, staticargs, args):
86 86 rfd, wfd = os.pipe()
87 87 workers = _numworkers(ui)
88 88 oldhandler = signal.getsignal(signal.SIGINT)
89 89 signal.signal(signal.SIGINT, signal.SIG_IGN)
90 90 pids, problem = set(), [0]
91 91 def killworkers():
92 92 # unregister SIGCHLD handler as all children will be killed. This
93 93 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
94 94 # could be updated while iterating, which would cause inconsistency.
95 95 signal.signal(signal.SIGCHLD, oldchldhandler)
96 96 # if one worker bails, there's no good reason to wait for the rest
97 97 for p in pids:
98 98 try:
99 99 os.kill(p, signal.SIGTERM)
100 100 except OSError as err:
101 101 if err.errno != errno.ESRCH:
102 102 raise
103 103 def waitforworkers(blocking=True):
104 104 for pid in pids.copy():
105 105 p = st = 0
106 106 while True:
107 107 try:
108 108 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
109 109 break
110 110 except OSError as e:
111 111 if e.errno == errno.EINTR:
112 112 continue
113 113 elif e.errno == errno.ECHILD:
114 break # ignore ECHILD
114 # child would already be reaped, but pids yet been
115 # updated (maybe interrupted just after waitpid)
116 pids.discard(pid)
117 break
115 118 else:
116 119 raise
117 120 if p:
118 pids.remove(p)
121 pids.discard(p)
119 122 st = _exitstatus(st)
120 123 if st and not problem[0]:
121 124 problem[0] = st
122 125 def sigchldhandler(signum, frame):
123 126 waitforworkers(blocking=False)
124 127 if problem[0]:
125 128 killworkers()
126 129 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
127 130 for pargs in partition(args, workers):
128 131 pid = os.fork()
129 132 if pid == 0:
130 133 signal.signal(signal.SIGINT, oldhandler)
131 134 signal.signal(signal.SIGCHLD, oldchldhandler)
132 135 try:
133 136 os.close(rfd)
134 137 for i, item in func(*(staticargs + (pargs,))):
135 138 os.write(wfd, '%d %s\n' % (i, item))
136 139 os._exit(0)
137 140 except KeyboardInterrupt:
138 141 os._exit(255)
139 142 # other exceptions are allowed to propagate, we rely
140 143 # on lock.py's pid checks to avoid release callbacks
141 144 pids.add(pid)
142 145 os.close(wfd)
143 146 fp = os.fdopen(rfd, 'rb', 0)
144 147 def cleanup():
145 148 signal.signal(signal.SIGINT, oldhandler)
146 149 waitforworkers()
147 150 signal.signal(signal.SIGCHLD, oldchldhandler)
148 151 status = problem[0]
149 152 if status:
150 153 if status < 0:
151 154 os.kill(os.getpid(), -status)
152 155 sys.exit(status)
153 156 try:
154 157 for line in util.iterfile(fp):
155 158 l = line.split(' ', 1)
156 159 yield int(l[0]), l[1][:-1]
157 160 except: # re-raises
158 161 killworkers()
159 162 cleanup()
160 163 raise
161 164 cleanup()
162 165
163 166 def _posixexitstatus(code):
164 167 '''convert a posix exit status into the same form returned by
165 168 os.spawnv
166 169
167 170 returns None if the process was stopped instead of exiting'''
168 171 if os.WIFEXITED(code):
169 172 return os.WEXITSTATUS(code)
170 173 elif os.WIFSIGNALED(code):
171 174 return -os.WTERMSIG(code)
172 175
173 176 if os.name != 'nt':
174 177 _platformworker = _posixworker
175 178 _exitstatus = _posixexitstatus
176 179
177 180 def partition(lst, nslices):
178 181 '''partition a list into N slices of roughly equal size
179 182
180 183 The current strategy takes every Nth element from the input. If
181 184 we ever write workers that need to preserve grouping in input
182 185 we should consider allowing callers to specify a partition strategy.
183 186
184 187 mpm is not a fan of this partitioning strategy when files are involved.
185 188 In his words:
186 189
187 190 Single-threaded Mercurial makes a point of creating and visiting
188 191 files in a fixed order (alphabetical). When creating files in order,
189 192 a typical filesystem is likely to allocate them on nearby regions on
190 193 disk. Thus, when revisiting in the same order, locality is maximized
191 194 and various forms of OS and disk-level caching and read-ahead get a
192 195 chance to work.
193 196
194 197 This effect can be quite significant on spinning disks. I discovered it
195 198 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
196 199 Tarring a repo and copying it to another disk effectively randomized
197 200 the revlog ordering on disk by sorting the revlogs by hash and suddenly
198 201 performance of my kernel checkout benchmark dropped by ~10x because the
199 202 "working set" of sectors visited no longer fit in the drive's cache and
200 203 the workload switched from streaming to random I/O.
201 204
202 205 What we should really be doing is have workers read filenames from a
203 206 ordered queue. This preserves locality and also keeps any worker from
204 207 getting more than one file out of balance.
205 208 '''
206 209 for i in range(nslices):
207 210 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now