##// END OF EJS Templates
worker: use os._exit for posix worker in all cases...
Jun Wu -
r30521:86cd09bc default
parent child Browse files
Show More
@@ -1,210 +1,222 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14
15 15 from .i18n import _
16 16 from . import (
17 17 error,
18 scmutil,
18 19 util,
19 20 )
20 21
21 22 def countcpus():
22 23 '''try to count the number of CPUs on the system'''
23 24
24 25 # posix
25 26 try:
26 27 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
27 28 if n > 0:
28 29 return n
29 30 except (AttributeError, ValueError):
30 31 pass
31 32
32 33 # windows
33 34 try:
34 35 n = int(os.environ['NUMBER_OF_PROCESSORS'])
35 36 if n > 0:
36 37 return n
37 38 except (KeyError, ValueError):
38 39 pass
39 40
40 41 return 1
41 42
42 43 def _numworkers(ui):
43 44 s = ui.config('worker', 'numcpus')
44 45 if s:
45 46 try:
46 47 n = int(s)
47 48 if n >= 1:
48 49 return n
49 50 except ValueError:
50 51 raise error.Abort(_('number of cpus must be an integer'))
51 52 return min(max(countcpus(), 4), 32)
52 53
53 54 if os.name == 'posix':
54 55 _startupcost = 0.01
55 56 else:
56 57 _startupcost = 1e30
57 58
58 59 def worthwhile(ui, costperop, nops):
59 60 '''try to determine whether the benefit of multiple processes can
60 61 outweigh the cost of starting them'''
61 62 linear = costperop * nops
62 63 workers = _numworkers(ui)
63 64 benefit = linear - (_startupcost * workers + linear / workers)
64 65 return benefit >= 0.15
65 66
66 67 def worker(ui, costperarg, func, staticargs, args):
67 68 '''run a function, possibly in parallel in multiple worker
68 69 processes.
69 70
70 71 returns a progress iterator
71 72
72 73 costperarg - cost of a single task
73 74
74 75 func - function to run
75 76
76 77 staticargs - arguments to pass to every invocation of the function
77 78
78 79 args - arguments to split into chunks, to pass to individual
79 80 workers
80 81 '''
81 82 if worthwhile(ui, costperarg, len(args)):
82 83 return _platformworker(ui, func, staticargs, args)
83 84 return func(*staticargs + (args,))
84 85
85 86 def _posixworker(ui, func, staticargs, args):
86 87 rfd, wfd = os.pipe()
87 88 workers = _numworkers(ui)
88 89 oldhandler = signal.getsignal(signal.SIGINT)
89 90 signal.signal(signal.SIGINT, signal.SIG_IGN)
90 91 pids, problem = set(), [0]
91 92 def killworkers():
92 93 # unregister SIGCHLD handler as all children will be killed. This
93 94 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
94 95 # could be updated while iterating, which would cause inconsistency.
95 96 signal.signal(signal.SIGCHLD, oldchldhandler)
96 97 # if one worker bails, there's no good reason to wait for the rest
97 98 for p in pids:
98 99 try:
99 100 os.kill(p, signal.SIGTERM)
100 101 except OSError as err:
101 102 if err.errno != errno.ESRCH:
102 103 raise
103 104 def waitforworkers(blocking=True):
104 105 for pid in pids.copy():
105 106 p = st = 0
106 107 while True:
107 108 try:
108 109 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
109 110 break
110 111 except OSError as e:
111 112 if e.errno == errno.EINTR:
112 113 continue
113 114 elif e.errno == errno.ECHILD:
114 115 # child would already be reaped, but pids yet been
115 116 # updated (maybe interrupted just after waitpid)
116 117 pids.discard(pid)
117 118 break
118 119 else:
119 120 raise
120 121 if p:
121 122 pids.discard(p)
122 123 st = _exitstatus(st)
123 124 if st and not problem[0]:
124 125 problem[0] = st
125 126 def sigchldhandler(signum, frame):
126 127 waitforworkers(blocking=False)
127 128 if problem[0]:
128 129 killworkers()
129 130 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
130 131 for pargs in partition(args, workers):
131 132 pid = os.fork()
132 133 if pid == 0:
133 134 signal.signal(signal.SIGINT, oldhandler)
134 135 signal.signal(signal.SIGCHLD, oldchldhandler)
135 try:
136
137 def workerfunc():
136 138 os.close(rfd)
137 139 for i, item in func(*(staticargs + (pargs,))):
138 140 os.write(wfd, '%d %s\n' % (i, item))
139 os._exit(0)
141
142 # make sure we use os._exit in all code paths. otherwise the worker
143 # may do some clean-ups which could cause surprises like deadlock.
144 # see sshpeer.cleanup for example.
145 try:
146 scmutil.callcatch(ui, workerfunc)
140 147 except KeyboardInterrupt:
141 148 os._exit(255)
142 # other exceptions are allowed to propagate, we rely
143 # on lock.py's pid checks to avoid release callbacks
149 except: # never return, therefore no re-raises
150 try:
151 ui.traceback()
152 finally:
153 os._exit(255)
154 else:
155 os._exit(0)
144 156 pids.add(pid)
145 157 os.close(wfd)
146 158 fp = os.fdopen(rfd, 'rb', 0)
147 159 def cleanup():
148 160 signal.signal(signal.SIGINT, oldhandler)
149 161 waitforworkers()
150 162 signal.signal(signal.SIGCHLD, oldchldhandler)
151 163 status = problem[0]
152 164 if status:
153 165 if status < 0:
154 166 os.kill(os.getpid(), -status)
155 167 sys.exit(status)
156 168 try:
157 169 for line in util.iterfile(fp):
158 170 l = line.split(' ', 1)
159 171 yield int(l[0]), l[1][:-1]
160 172 except: # re-raises
161 173 killworkers()
162 174 cleanup()
163 175 raise
164 176 cleanup()
165 177
166 178 def _posixexitstatus(code):
167 179 '''convert a posix exit status into the same form returned by
168 180 os.spawnv
169 181
170 182 returns None if the process was stopped instead of exiting'''
171 183 if os.WIFEXITED(code):
172 184 return os.WEXITSTATUS(code)
173 185 elif os.WIFSIGNALED(code):
174 186 return -os.WTERMSIG(code)
175 187
176 188 if os.name != 'nt':
177 189 _platformworker = _posixworker
178 190 _exitstatus = _posixexitstatus
179 191
180 192 def partition(lst, nslices):
181 193 '''partition a list into N slices of roughly equal size
182 194
183 195 The current strategy takes every Nth element from the input. If
184 196 we ever write workers that need to preserve grouping in input
185 197 we should consider allowing callers to specify a partition strategy.
186 198
187 199 mpm is not a fan of this partitioning strategy when files are involved.
188 200 In his words:
189 201
190 202 Single-threaded Mercurial makes a point of creating and visiting
191 203 files in a fixed order (alphabetical). When creating files in order,
192 204 a typical filesystem is likely to allocate them on nearby regions on
193 205 disk. Thus, when revisiting in the same order, locality is maximized
194 206 and various forms of OS and disk-level caching and read-ahead get a
195 207 chance to work.
196 208
197 209 This effect can be quite significant on spinning disks. I discovered it
198 210 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
199 211 Tarring a repo and copying it to another disk effectively randomized
200 212 the revlog ordering on disk by sorting the revlogs by hash and suddenly
201 213 performance of my kernel checkout benchmark dropped by ~10x because the
202 214 "working set" of sectors visited no longer fit in the drive's cache and
203 215 the workload switched from streaming to random I/O.
204 216
205 217 What we should really be doing is have workers read filenames from a
206 218 ordered queue. This preserves locality and also keeps any worker from
207 219 getting more than one file out of balance.
208 220 '''
209 221 for i in range(nslices):
210 222 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now