##// END OF EJS Templates
py3: pass str in os.sysconf()...
Pulkit Goyal -
r32611:95448993 default
parent child Browse files
Show More
@@ -1,240 +1,240
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14
15 15 from .i18n import _
16 16 from . import (
17 17 encoding,
18 18 error,
19 19 pycompat,
20 20 scmutil,
21 21 util,
22 22 )
23 23
24 24 def countcpus():
25 25 '''try to count the number of CPUs on the system'''
26 26
27 27 # posix
28 28 try:
29 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
29 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
30 30 if n > 0:
31 31 return n
32 32 except (AttributeError, ValueError):
33 33 pass
34 34
35 35 # windows
36 36 try:
37 37 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
38 38 if n > 0:
39 39 return n
40 40 except (KeyError, ValueError):
41 41 pass
42 42
43 43 return 1
44 44
45 45 def _numworkers(ui):
46 46 s = ui.config('worker', 'numcpus')
47 47 if s:
48 48 try:
49 49 n = int(s)
50 50 if n >= 1:
51 51 return n
52 52 except ValueError:
53 53 raise error.Abort(_('number of cpus must be an integer'))
54 54 return min(max(countcpus(), 4), 32)
55 55
56 56 if pycompat.osname == 'posix':
57 57 _startupcost = 0.01
58 58 else:
59 59 _startupcost = 1e30
60 60
61 61 def worthwhile(ui, costperop, nops):
62 62 '''try to determine whether the benefit of multiple processes can
63 63 outweigh the cost of starting them'''
64 64 linear = costperop * nops
65 65 workers = _numworkers(ui)
66 66 benefit = linear - (_startupcost * workers + linear / workers)
67 67 return benefit >= 0.15
68 68
69 69 def worker(ui, costperarg, func, staticargs, args):
70 70 '''run a function, possibly in parallel in multiple worker
71 71 processes.
72 72
73 73 returns a progress iterator
74 74
75 75 costperarg - cost of a single task
76 76
77 77 func - function to run
78 78
79 79 staticargs - arguments to pass to every invocation of the function
80 80
81 81 args - arguments to split into chunks, to pass to individual
82 82 workers
83 83 '''
84 84 if worthwhile(ui, costperarg, len(args)):
85 85 return _platformworker(ui, func, staticargs, args)
86 86 return func(*staticargs + (args,))
87 87
88 88 def _posixworker(ui, func, staticargs, args):
89 89 rfd, wfd = os.pipe()
90 90 workers = _numworkers(ui)
91 91 oldhandler = signal.getsignal(signal.SIGINT)
92 92 signal.signal(signal.SIGINT, signal.SIG_IGN)
93 93 pids, problem = set(), [0]
94 94 def killworkers():
95 95 # unregister SIGCHLD handler as all children will be killed. This
96 96 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
97 97 # could be updated while iterating, which would cause inconsistency.
98 98 signal.signal(signal.SIGCHLD, oldchldhandler)
99 99 # if one worker bails, there's no good reason to wait for the rest
100 100 for p in pids:
101 101 try:
102 102 os.kill(p, signal.SIGTERM)
103 103 except OSError as err:
104 104 if err.errno != errno.ESRCH:
105 105 raise
106 106 def waitforworkers(blocking=True):
107 107 for pid in pids.copy():
108 108 p = st = 0
109 109 while True:
110 110 try:
111 111 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
112 112 break
113 113 except OSError as e:
114 114 if e.errno == errno.EINTR:
115 115 continue
116 116 elif e.errno == errno.ECHILD:
117 117 # child would already be reaped, but pids yet been
118 118 # updated (maybe interrupted just after waitpid)
119 119 pids.discard(pid)
120 120 break
121 121 else:
122 122 raise
123 123 if not p:
124 124 # skip subsequent steps, because child process should
125 125 # be still running in this case
126 126 continue
127 127 pids.discard(p)
128 128 st = _exitstatus(st)
129 129 if st and not problem[0]:
130 130 problem[0] = st
131 131 def sigchldhandler(signum, frame):
132 132 waitforworkers(blocking=False)
133 133 if problem[0]:
134 134 killworkers()
135 135 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
136 136 ui.flush()
137 137 parentpid = os.getpid()
138 138 for pargs in partition(args, workers):
139 139 # make sure we use os._exit in all worker code paths. otherwise the
140 140 # worker may do some clean-ups which could cause surprises like
141 141 # deadlock. see sshpeer.cleanup for example.
142 142 # override error handling *before* fork. this is necessary because
143 143 # exception (signal) may arrive after fork, before "pid =" assignment
144 144 # completes, and other exception handler (dispatch.py) can lead to
145 145 # unexpected code path without os._exit.
146 146 ret = -1
147 147 try:
148 148 pid = os.fork()
149 149 if pid == 0:
150 150 signal.signal(signal.SIGINT, oldhandler)
151 151 signal.signal(signal.SIGCHLD, oldchldhandler)
152 152
153 153 def workerfunc():
154 154 os.close(rfd)
155 155 for i, item in func(*(staticargs + (pargs,))):
156 156 os.write(wfd, '%d %s\n' % (i, item))
157 157 return 0
158 158
159 159 ret = scmutil.callcatch(ui, workerfunc)
160 160 except: # parent re-raises, child never returns
161 161 if os.getpid() == parentpid:
162 162 raise
163 163 exctype = sys.exc_info()[0]
164 164 force = not issubclass(exctype, KeyboardInterrupt)
165 165 ui.traceback(force=force)
166 166 finally:
167 167 if os.getpid() != parentpid:
168 168 try:
169 169 ui.flush()
170 170 except: # never returns, no re-raises
171 171 pass
172 172 finally:
173 173 os._exit(ret & 255)
174 174 pids.add(pid)
175 175 os.close(wfd)
176 176 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
177 177 def cleanup():
178 178 signal.signal(signal.SIGINT, oldhandler)
179 179 waitforworkers()
180 180 signal.signal(signal.SIGCHLD, oldchldhandler)
181 181 status = problem[0]
182 182 if status:
183 183 if status < 0:
184 184 os.kill(os.getpid(), -status)
185 185 sys.exit(status)
186 186 try:
187 187 for line in util.iterfile(fp):
188 188 l = line.split(' ', 1)
189 189 yield int(l[0]), l[1][:-1]
190 190 except: # re-raises
191 191 killworkers()
192 192 cleanup()
193 193 raise
194 194 cleanup()
195 195
196 196 def _posixexitstatus(code):
197 197 '''convert a posix exit status into the same form returned by
198 198 os.spawnv
199 199
200 200 returns None if the process was stopped instead of exiting'''
201 201 if os.WIFEXITED(code):
202 202 return os.WEXITSTATUS(code)
203 203 elif os.WIFSIGNALED(code):
204 204 return -os.WTERMSIG(code)
205 205
206 206 if pycompat.osname != 'nt':
207 207 _platformworker = _posixworker
208 208 _exitstatus = _posixexitstatus
209 209
210 210 def partition(lst, nslices):
211 211 '''partition a list into N slices of roughly equal size
212 212
213 213 The current strategy takes every Nth element from the input. If
214 214 we ever write workers that need to preserve grouping in input
215 215 we should consider allowing callers to specify a partition strategy.
216 216
217 217 mpm is not a fan of this partitioning strategy when files are involved.
218 218 In his words:
219 219
220 220 Single-threaded Mercurial makes a point of creating and visiting
221 221 files in a fixed order (alphabetical). When creating files in order,
222 222 a typical filesystem is likely to allocate them on nearby regions on
223 223 disk. Thus, when revisiting in the same order, locality is maximized
224 224 and various forms of OS and disk-level caching and read-ahead get a
225 225 chance to work.
226 226
227 227 This effect can be quite significant on spinning disks. I discovered it
228 228 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
229 229 Tarring a repo and copying it to another disk effectively randomized
230 230 the revlog ordering on disk by sorting the revlogs by hash and suddenly
231 231 performance of my kernel checkout benchmark dropped by ~10x because the
232 232 "working set" of sectors visited no longer fit in the drive's cache and
233 233 the workload switched from streaming to random I/O.
234 234
235 235 What we should really be doing is have workers read filenames from a
236 236 ordered queue. This preserves locality and also keeps any worker from
237 237 getting more than one file out of balance.
238 238 '''
239 239 for i in range(nslices):
240 240 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now