##// END OF EJS Templates
py3: pass str in os.sysconf()...
Pulkit Goyal -
r32611:95448993 default
parent child Browse files
Show More
@@ -1,240 +1,240
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14
14
15 from .i18n import _
15 from .i18n import _
16 from . import (
16 from . import (
17 encoding,
17 encoding,
18 error,
18 error,
19 pycompat,
19 pycompat,
20 scmutil,
20 scmutil,
21 util,
21 util,
22 )
22 )
23
23
24 def countcpus():
24 def countcpus():
25 '''try to count the number of CPUs on the system'''
25 '''try to count the number of CPUs on the system'''
26
26
27 # posix
27 # posix
28 try:
28 try:
29 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
29 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
30 if n > 0:
30 if n > 0:
31 return n
31 return n
32 except (AttributeError, ValueError):
32 except (AttributeError, ValueError):
33 pass
33 pass
34
34
35 # windows
35 # windows
36 try:
36 try:
37 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
37 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
38 if n > 0:
38 if n > 0:
39 return n
39 return n
40 except (KeyError, ValueError):
40 except (KeyError, ValueError):
41 pass
41 pass
42
42
43 return 1
43 return 1
44
44
45 def _numworkers(ui):
45 def _numworkers(ui):
46 s = ui.config('worker', 'numcpus')
46 s = ui.config('worker', 'numcpus')
47 if s:
47 if s:
48 try:
48 try:
49 n = int(s)
49 n = int(s)
50 if n >= 1:
50 if n >= 1:
51 return n
51 return n
52 except ValueError:
52 except ValueError:
53 raise error.Abort(_('number of cpus must be an integer'))
53 raise error.Abort(_('number of cpus must be an integer'))
54 return min(max(countcpus(), 4), 32)
54 return min(max(countcpus(), 4), 32)
55
55
56 if pycompat.osname == 'posix':
56 if pycompat.osname == 'posix':
57 _startupcost = 0.01
57 _startupcost = 0.01
58 else:
58 else:
59 _startupcost = 1e30
59 _startupcost = 1e30
60
60
61 def worthwhile(ui, costperop, nops):
61 def worthwhile(ui, costperop, nops):
62 '''try to determine whether the benefit of multiple processes can
62 '''try to determine whether the benefit of multiple processes can
63 outweigh the cost of starting them'''
63 outweigh the cost of starting them'''
64 linear = costperop * nops
64 linear = costperop * nops
65 workers = _numworkers(ui)
65 workers = _numworkers(ui)
66 benefit = linear - (_startupcost * workers + linear / workers)
66 benefit = linear - (_startupcost * workers + linear / workers)
67 return benefit >= 0.15
67 return benefit >= 0.15
68
68
69 def worker(ui, costperarg, func, staticargs, args):
69 def worker(ui, costperarg, func, staticargs, args):
70 '''run a function, possibly in parallel in multiple worker
70 '''run a function, possibly in parallel in multiple worker
71 processes.
71 processes.
72
72
73 returns a progress iterator
73 returns a progress iterator
74
74
75 costperarg - cost of a single task
75 costperarg - cost of a single task
76
76
77 func - function to run
77 func - function to run
78
78
79 staticargs - arguments to pass to every invocation of the function
79 staticargs - arguments to pass to every invocation of the function
80
80
81 args - arguments to split into chunks, to pass to individual
81 args - arguments to split into chunks, to pass to individual
82 workers
82 workers
83 '''
83 '''
84 if worthwhile(ui, costperarg, len(args)):
84 if worthwhile(ui, costperarg, len(args)):
85 return _platformworker(ui, func, staticargs, args)
85 return _platformworker(ui, func, staticargs, args)
86 return func(*staticargs + (args,))
86 return func(*staticargs + (args,))
87
87
88 def _posixworker(ui, func, staticargs, args):
88 def _posixworker(ui, func, staticargs, args):
89 rfd, wfd = os.pipe()
89 rfd, wfd = os.pipe()
90 workers = _numworkers(ui)
90 workers = _numworkers(ui)
91 oldhandler = signal.getsignal(signal.SIGINT)
91 oldhandler = signal.getsignal(signal.SIGINT)
92 signal.signal(signal.SIGINT, signal.SIG_IGN)
92 signal.signal(signal.SIGINT, signal.SIG_IGN)
93 pids, problem = set(), [0]
93 pids, problem = set(), [0]
94 def killworkers():
94 def killworkers():
95 # unregister SIGCHLD handler as all children will be killed. This
95 # unregister SIGCHLD handler as all children will be killed. This
96 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
96 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
97 # could be updated while iterating, which would cause inconsistency.
97 # could be updated while iterating, which would cause inconsistency.
98 signal.signal(signal.SIGCHLD, oldchldhandler)
98 signal.signal(signal.SIGCHLD, oldchldhandler)
99 # if one worker bails, there's no good reason to wait for the rest
99 # if one worker bails, there's no good reason to wait for the rest
100 for p in pids:
100 for p in pids:
101 try:
101 try:
102 os.kill(p, signal.SIGTERM)
102 os.kill(p, signal.SIGTERM)
103 except OSError as err:
103 except OSError as err:
104 if err.errno != errno.ESRCH:
104 if err.errno != errno.ESRCH:
105 raise
105 raise
106 def waitforworkers(blocking=True):
106 def waitforworkers(blocking=True):
107 for pid in pids.copy():
107 for pid in pids.copy():
108 p = st = 0
108 p = st = 0
109 while True:
109 while True:
110 try:
110 try:
111 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
111 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
112 break
112 break
113 except OSError as e:
113 except OSError as e:
114 if e.errno == errno.EINTR:
114 if e.errno == errno.EINTR:
115 continue
115 continue
116 elif e.errno == errno.ECHILD:
116 elif e.errno == errno.ECHILD:
117 # child would already be reaped, but pids yet been
117 # child would already be reaped, but pids yet been
118 # updated (maybe interrupted just after waitpid)
118 # updated (maybe interrupted just after waitpid)
119 pids.discard(pid)
119 pids.discard(pid)
120 break
120 break
121 else:
121 else:
122 raise
122 raise
123 if not p:
123 if not p:
124 # skip subsequent steps, because child process should
124 # skip subsequent steps, because child process should
125 # be still running in this case
125 # be still running in this case
126 continue
126 continue
127 pids.discard(p)
127 pids.discard(p)
128 st = _exitstatus(st)
128 st = _exitstatus(st)
129 if st and not problem[0]:
129 if st and not problem[0]:
130 problem[0] = st
130 problem[0] = st
131 def sigchldhandler(signum, frame):
131 def sigchldhandler(signum, frame):
132 waitforworkers(blocking=False)
132 waitforworkers(blocking=False)
133 if problem[0]:
133 if problem[0]:
134 killworkers()
134 killworkers()
135 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
135 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
136 ui.flush()
136 ui.flush()
137 parentpid = os.getpid()
137 parentpid = os.getpid()
138 for pargs in partition(args, workers):
138 for pargs in partition(args, workers):
139 # make sure we use os._exit in all worker code paths. otherwise the
139 # make sure we use os._exit in all worker code paths. otherwise the
140 # worker may do some clean-ups which could cause surprises like
140 # worker may do some clean-ups which could cause surprises like
141 # deadlock. see sshpeer.cleanup for example.
141 # deadlock. see sshpeer.cleanup for example.
142 # override error handling *before* fork. this is necessary because
142 # override error handling *before* fork. this is necessary because
143 # exception (signal) may arrive after fork, before "pid =" assignment
143 # exception (signal) may arrive after fork, before "pid =" assignment
144 # completes, and other exception handler (dispatch.py) can lead to
144 # completes, and other exception handler (dispatch.py) can lead to
145 # unexpected code path without os._exit.
145 # unexpected code path without os._exit.
146 ret = -1
146 ret = -1
147 try:
147 try:
148 pid = os.fork()
148 pid = os.fork()
149 if pid == 0:
149 if pid == 0:
150 signal.signal(signal.SIGINT, oldhandler)
150 signal.signal(signal.SIGINT, oldhandler)
151 signal.signal(signal.SIGCHLD, oldchldhandler)
151 signal.signal(signal.SIGCHLD, oldchldhandler)
152
152
153 def workerfunc():
153 def workerfunc():
154 os.close(rfd)
154 os.close(rfd)
155 for i, item in func(*(staticargs + (pargs,))):
155 for i, item in func(*(staticargs + (pargs,))):
156 os.write(wfd, '%d %s\n' % (i, item))
156 os.write(wfd, '%d %s\n' % (i, item))
157 return 0
157 return 0
158
158
159 ret = scmutil.callcatch(ui, workerfunc)
159 ret = scmutil.callcatch(ui, workerfunc)
160 except: # parent re-raises, child never returns
160 except: # parent re-raises, child never returns
161 if os.getpid() == parentpid:
161 if os.getpid() == parentpid:
162 raise
162 raise
163 exctype = sys.exc_info()[0]
163 exctype = sys.exc_info()[0]
164 force = not issubclass(exctype, KeyboardInterrupt)
164 force = not issubclass(exctype, KeyboardInterrupt)
165 ui.traceback(force=force)
165 ui.traceback(force=force)
166 finally:
166 finally:
167 if os.getpid() != parentpid:
167 if os.getpid() != parentpid:
168 try:
168 try:
169 ui.flush()
169 ui.flush()
170 except: # never returns, no re-raises
170 except: # never returns, no re-raises
171 pass
171 pass
172 finally:
172 finally:
173 os._exit(ret & 255)
173 os._exit(ret & 255)
174 pids.add(pid)
174 pids.add(pid)
175 os.close(wfd)
175 os.close(wfd)
176 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
176 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
177 def cleanup():
177 def cleanup():
178 signal.signal(signal.SIGINT, oldhandler)
178 signal.signal(signal.SIGINT, oldhandler)
179 waitforworkers()
179 waitforworkers()
180 signal.signal(signal.SIGCHLD, oldchldhandler)
180 signal.signal(signal.SIGCHLD, oldchldhandler)
181 status = problem[0]
181 status = problem[0]
182 if status:
182 if status:
183 if status < 0:
183 if status < 0:
184 os.kill(os.getpid(), -status)
184 os.kill(os.getpid(), -status)
185 sys.exit(status)
185 sys.exit(status)
186 try:
186 try:
187 for line in util.iterfile(fp):
187 for line in util.iterfile(fp):
188 l = line.split(' ', 1)
188 l = line.split(' ', 1)
189 yield int(l[0]), l[1][:-1]
189 yield int(l[0]), l[1][:-1]
190 except: # re-raises
190 except: # re-raises
191 killworkers()
191 killworkers()
192 cleanup()
192 cleanup()
193 raise
193 raise
194 cleanup()
194 cleanup()
195
195
196 def _posixexitstatus(code):
196 def _posixexitstatus(code):
197 '''convert a posix exit status into the same form returned by
197 '''convert a posix exit status into the same form returned by
198 os.spawnv
198 os.spawnv
199
199
200 returns None if the process was stopped instead of exiting'''
200 returns None if the process was stopped instead of exiting'''
201 if os.WIFEXITED(code):
201 if os.WIFEXITED(code):
202 return os.WEXITSTATUS(code)
202 return os.WEXITSTATUS(code)
203 elif os.WIFSIGNALED(code):
203 elif os.WIFSIGNALED(code):
204 return -os.WTERMSIG(code)
204 return -os.WTERMSIG(code)
205
205
206 if pycompat.osname != 'nt':
206 if pycompat.osname != 'nt':
207 _platformworker = _posixworker
207 _platformworker = _posixworker
208 _exitstatus = _posixexitstatus
208 _exitstatus = _posixexitstatus
209
209
210 def partition(lst, nslices):
210 def partition(lst, nslices):
211 '''partition a list into N slices of roughly equal size
211 '''partition a list into N slices of roughly equal size
212
212
213 The current strategy takes every Nth element from the input. If
213 The current strategy takes every Nth element from the input. If
214 we ever write workers that need to preserve grouping in input
214 we ever write workers that need to preserve grouping in input
215 we should consider allowing callers to specify a partition strategy.
215 we should consider allowing callers to specify a partition strategy.
216
216
217 mpm is not a fan of this partitioning strategy when files are involved.
217 mpm is not a fan of this partitioning strategy when files are involved.
218 In his words:
218 In his words:
219
219
220 Single-threaded Mercurial makes a point of creating and visiting
220 Single-threaded Mercurial makes a point of creating and visiting
221 files in a fixed order (alphabetical). When creating files in order,
221 files in a fixed order (alphabetical). When creating files in order,
222 a typical filesystem is likely to allocate them on nearby regions on
222 a typical filesystem is likely to allocate them on nearby regions on
223 disk. Thus, when revisiting in the same order, locality is maximized
223 disk. Thus, when revisiting in the same order, locality is maximized
224 and various forms of OS and disk-level caching and read-ahead get a
224 and various forms of OS and disk-level caching and read-ahead get a
225 chance to work.
225 chance to work.
226
226
227 This effect can be quite significant on spinning disks. I discovered it
227 This effect can be quite significant on spinning disks. I discovered it
228 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
228 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
229 Tarring a repo and copying it to another disk effectively randomized
229 Tarring a repo and copying it to another disk effectively randomized
230 the revlog ordering on disk by sorting the revlogs by hash and suddenly
230 the revlog ordering on disk by sorting the revlogs by hash and suddenly
231 performance of my kernel checkout benchmark dropped by ~10x because the
231 performance of my kernel checkout benchmark dropped by ~10x because the
232 "working set" of sectors visited no longer fit in the drive's cache and
232 "working set" of sectors visited no longer fit in the drive's cache and
233 the workload switched from streaming to random I/O.
233 the workload switched from streaming to random I/O.
234
234
235 What we should really be doing is have workers read filenames from a
235 What we should really be doing is have workers read filenames from a
236 ordered queue. This preserves locality and also keeps any worker from
236 ordered queue. This preserves locality and also keeps any worker from
237 getting more than one file out of balance.
237 getting more than one file out of balance.
238 '''
238 '''
239 for i in range(nslices):
239 for i in range(nslices):
240 yield lst[i::nslices]
240 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now