##// END OF EJS Templates
worker: use os._exit for posix worker in all cases...
Jun Wu -
r30521:86cd09bc default
parent child Browse files
Show More
@@ -1,210 +1,222 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14
14
15 from .i18n import _
15 from .i18n import _
16 from . import (
16 from . import (
17 error,
17 error,
18 scmutil,
18 util,
19 util,
19 )
20 )
20
21
21 def countcpus():
22 def countcpus():
22 '''try to count the number of CPUs on the system'''
23 '''try to count the number of CPUs on the system'''
23
24
24 # posix
25 # posix
25 try:
26 try:
26 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
27 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
27 if n > 0:
28 if n > 0:
28 return n
29 return n
29 except (AttributeError, ValueError):
30 except (AttributeError, ValueError):
30 pass
31 pass
31
32
32 # windows
33 # windows
33 try:
34 try:
34 n = int(os.environ['NUMBER_OF_PROCESSORS'])
35 n = int(os.environ['NUMBER_OF_PROCESSORS'])
35 if n > 0:
36 if n > 0:
36 return n
37 return n
37 except (KeyError, ValueError):
38 except (KeyError, ValueError):
38 pass
39 pass
39
40
40 return 1
41 return 1
41
42
42 def _numworkers(ui):
43 def _numworkers(ui):
43 s = ui.config('worker', 'numcpus')
44 s = ui.config('worker', 'numcpus')
44 if s:
45 if s:
45 try:
46 try:
46 n = int(s)
47 n = int(s)
47 if n >= 1:
48 if n >= 1:
48 return n
49 return n
49 except ValueError:
50 except ValueError:
50 raise error.Abort(_('number of cpus must be an integer'))
51 raise error.Abort(_('number of cpus must be an integer'))
51 return min(max(countcpus(), 4), 32)
52 return min(max(countcpus(), 4), 32)
52
53
53 if os.name == 'posix':
54 if os.name == 'posix':
54 _startupcost = 0.01
55 _startupcost = 0.01
55 else:
56 else:
56 _startupcost = 1e30
57 _startupcost = 1e30
57
58
58 def worthwhile(ui, costperop, nops):
59 def worthwhile(ui, costperop, nops):
59 '''try to determine whether the benefit of multiple processes can
60 '''try to determine whether the benefit of multiple processes can
60 outweigh the cost of starting them'''
61 outweigh the cost of starting them'''
61 linear = costperop * nops
62 linear = costperop * nops
62 workers = _numworkers(ui)
63 workers = _numworkers(ui)
63 benefit = linear - (_startupcost * workers + linear / workers)
64 benefit = linear - (_startupcost * workers + linear / workers)
64 return benefit >= 0.15
65 return benefit >= 0.15
65
66
66 def worker(ui, costperarg, func, staticargs, args):
67 def worker(ui, costperarg, func, staticargs, args):
67 '''run a function, possibly in parallel in multiple worker
68 '''run a function, possibly in parallel in multiple worker
68 processes.
69 processes.
69
70
70 returns a progress iterator
71 returns a progress iterator
71
72
72 costperarg - cost of a single task
73 costperarg - cost of a single task
73
74
74 func - function to run
75 func - function to run
75
76
76 staticargs - arguments to pass to every invocation of the function
77 staticargs - arguments to pass to every invocation of the function
77
78
78 args - arguments to split into chunks, to pass to individual
79 args - arguments to split into chunks, to pass to individual
79 workers
80 workers
80 '''
81 '''
81 if worthwhile(ui, costperarg, len(args)):
82 if worthwhile(ui, costperarg, len(args)):
82 return _platformworker(ui, func, staticargs, args)
83 return _platformworker(ui, func, staticargs, args)
83 return func(*staticargs + (args,))
84 return func(*staticargs + (args,))
84
85
85 def _posixworker(ui, func, staticargs, args):
86 def _posixworker(ui, func, staticargs, args):
86 rfd, wfd = os.pipe()
87 rfd, wfd = os.pipe()
87 workers = _numworkers(ui)
88 workers = _numworkers(ui)
88 oldhandler = signal.getsignal(signal.SIGINT)
89 oldhandler = signal.getsignal(signal.SIGINT)
89 signal.signal(signal.SIGINT, signal.SIG_IGN)
90 signal.signal(signal.SIGINT, signal.SIG_IGN)
90 pids, problem = set(), [0]
91 pids, problem = set(), [0]
91 def killworkers():
92 def killworkers():
92 # unregister SIGCHLD handler as all children will be killed. This
93 # unregister SIGCHLD handler as all children will be killed. This
93 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
94 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
94 # could be updated while iterating, which would cause inconsistency.
95 # could be updated while iterating, which would cause inconsistency.
95 signal.signal(signal.SIGCHLD, oldchldhandler)
96 signal.signal(signal.SIGCHLD, oldchldhandler)
96 # if one worker bails, there's no good reason to wait for the rest
97 # if one worker bails, there's no good reason to wait for the rest
97 for p in pids:
98 for p in pids:
98 try:
99 try:
99 os.kill(p, signal.SIGTERM)
100 os.kill(p, signal.SIGTERM)
100 except OSError as err:
101 except OSError as err:
101 if err.errno != errno.ESRCH:
102 if err.errno != errno.ESRCH:
102 raise
103 raise
103 def waitforworkers(blocking=True):
104 def waitforworkers(blocking=True):
104 for pid in pids.copy():
105 for pid in pids.copy():
105 p = st = 0
106 p = st = 0
106 while True:
107 while True:
107 try:
108 try:
108 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
109 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
109 break
110 break
110 except OSError as e:
111 except OSError as e:
111 if e.errno == errno.EINTR:
112 if e.errno == errno.EINTR:
112 continue
113 continue
113 elif e.errno == errno.ECHILD:
114 elif e.errno == errno.ECHILD:
114 # child would already be reaped, but pids yet been
115 # child would already be reaped, but pids yet been
115 # updated (maybe interrupted just after waitpid)
116 # updated (maybe interrupted just after waitpid)
116 pids.discard(pid)
117 pids.discard(pid)
117 break
118 break
118 else:
119 else:
119 raise
120 raise
120 if p:
121 if p:
121 pids.discard(p)
122 pids.discard(p)
122 st = _exitstatus(st)
123 st = _exitstatus(st)
123 if st and not problem[0]:
124 if st and not problem[0]:
124 problem[0] = st
125 problem[0] = st
125 def sigchldhandler(signum, frame):
126 def sigchldhandler(signum, frame):
126 waitforworkers(blocking=False)
127 waitforworkers(blocking=False)
127 if problem[0]:
128 if problem[0]:
128 killworkers()
129 killworkers()
129 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
130 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
130 for pargs in partition(args, workers):
131 for pargs in partition(args, workers):
131 pid = os.fork()
132 pid = os.fork()
132 if pid == 0:
133 if pid == 0:
133 signal.signal(signal.SIGINT, oldhandler)
134 signal.signal(signal.SIGINT, oldhandler)
134 signal.signal(signal.SIGCHLD, oldchldhandler)
135 signal.signal(signal.SIGCHLD, oldchldhandler)
135 try:
136
137 def workerfunc():
136 os.close(rfd)
138 os.close(rfd)
137 for i, item in func(*(staticargs + (pargs,))):
139 for i, item in func(*(staticargs + (pargs,))):
138 os.write(wfd, '%d %s\n' % (i, item))
140 os.write(wfd, '%d %s\n' % (i, item))
139 os._exit(0)
141
142 # make sure we use os._exit in all code paths. otherwise the worker
143 # may do some clean-ups which could cause surprises like deadlock.
144 # see sshpeer.cleanup for example.
145 try:
146 scmutil.callcatch(ui, workerfunc)
140 except KeyboardInterrupt:
147 except KeyboardInterrupt:
141 os._exit(255)
148 os._exit(255)
142 # other exceptions are allowed to propagate, we rely
149 except: # never return, therefore no re-raises
143 # on lock.py's pid checks to avoid release callbacks
150 try:
151 ui.traceback()
152 finally:
153 os._exit(255)
154 else:
155 os._exit(0)
144 pids.add(pid)
156 pids.add(pid)
145 os.close(wfd)
157 os.close(wfd)
146 fp = os.fdopen(rfd, 'rb', 0)
158 fp = os.fdopen(rfd, 'rb', 0)
147 def cleanup():
159 def cleanup():
148 signal.signal(signal.SIGINT, oldhandler)
160 signal.signal(signal.SIGINT, oldhandler)
149 waitforworkers()
161 waitforworkers()
150 signal.signal(signal.SIGCHLD, oldchldhandler)
162 signal.signal(signal.SIGCHLD, oldchldhandler)
151 status = problem[0]
163 status = problem[0]
152 if status:
164 if status:
153 if status < 0:
165 if status < 0:
154 os.kill(os.getpid(), -status)
166 os.kill(os.getpid(), -status)
155 sys.exit(status)
167 sys.exit(status)
156 try:
168 try:
157 for line in util.iterfile(fp):
169 for line in util.iterfile(fp):
158 l = line.split(' ', 1)
170 l = line.split(' ', 1)
159 yield int(l[0]), l[1][:-1]
171 yield int(l[0]), l[1][:-1]
160 except: # re-raises
172 except: # re-raises
161 killworkers()
173 killworkers()
162 cleanup()
174 cleanup()
163 raise
175 raise
164 cleanup()
176 cleanup()
165
177
166 def _posixexitstatus(code):
178 def _posixexitstatus(code):
167 '''convert a posix exit status into the same form returned by
179 '''convert a posix exit status into the same form returned by
168 os.spawnv
180 os.spawnv
169
181
170 returns None if the process was stopped instead of exiting'''
182 returns None if the process was stopped instead of exiting'''
171 if os.WIFEXITED(code):
183 if os.WIFEXITED(code):
172 return os.WEXITSTATUS(code)
184 return os.WEXITSTATUS(code)
173 elif os.WIFSIGNALED(code):
185 elif os.WIFSIGNALED(code):
174 return -os.WTERMSIG(code)
186 return -os.WTERMSIG(code)
175
187
176 if os.name != 'nt':
188 if os.name != 'nt':
177 _platformworker = _posixworker
189 _platformworker = _posixworker
178 _exitstatus = _posixexitstatus
190 _exitstatus = _posixexitstatus
179
191
180 def partition(lst, nslices):
192 def partition(lst, nslices):
181 '''partition a list into N slices of roughly equal size
193 '''partition a list into N slices of roughly equal size
182
194
183 The current strategy takes every Nth element from the input. If
195 The current strategy takes every Nth element from the input. If
184 we ever write workers that need to preserve grouping in input
196 we ever write workers that need to preserve grouping in input
185 we should consider allowing callers to specify a partition strategy.
197 we should consider allowing callers to specify a partition strategy.
186
198
187 mpm is not a fan of this partitioning strategy when files are involved.
199 mpm is not a fan of this partitioning strategy when files are involved.
188 In his words:
200 In his words:
189
201
190 Single-threaded Mercurial makes a point of creating and visiting
202 Single-threaded Mercurial makes a point of creating and visiting
191 files in a fixed order (alphabetical). When creating files in order,
203 files in a fixed order (alphabetical). When creating files in order,
192 a typical filesystem is likely to allocate them on nearby regions on
204 a typical filesystem is likely to allocate them on nearby regions on
193 disk. Thus, when revisiting in the same order, locality is maximized
205 disk. Thus, when revisiting in the same order, locality is maximized
194 and various forms of OS and disk-level caching and read-ahead get a
206 and various forms of OS and disk-level caching and read-ahead get a
195 chance to work.
207 chance to work.
196
208
197 This effect can be quite significant on spinning disks. I discovered it
209 This effect can be quite significant on spinning disks. I discovered it
198 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
210 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
199 Tarring a repo and copying it to another disk effectively randomized
211 Tarring a repo and copying it to another disk effectively randomized
200 the revlog ordering on disk by sorting the revlogs by hash and suddenly
212 the revlog ordering on disk by sorting the revlogs by hash and suddenly
201 performance of my kernel checkout benchmark dropped by ~10x because the
213 performance of my kernel checkout benchmark dropped by ~10x because the
202 "working set" of sectors visited no longer fit in the drive's cache and
214 "working set" of sectors visited no longer fit in the drive's cache and
203 the workload switched from streaming to random I/O.
215 the workload switched from streaming to random I/O.
204
216
205 What we should really be doing is have workers read filenames from a
217 What we should really be doing is have workers read filenames from a
206 ordered queue. This preserves locality and also keeps any worker from
218 ordered queue. This preserves locality and also keeps any worker from
207 getting more than one file out of balance.
219 getting more than one file out of balance.
208 '''
220 '''
209 for i in range(nslices):
221 for i in range(nslices):
210 yield lst[i::nslices]
222 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now