##// END OF EJS Templates
worker: stop using a separate thread waiting for children...
Jun Wu -
r30416:c27614f2 default
parent child Browse files
Show More
@@ -1,206 +1,203 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
15
14
16 from .i18n import _
15 from .i18n import _
17 from . import (
16 from . import (
18 error,
17 error,
19 util,
18 util,
20 )
19 )
21
20
22 def countcpus():
21 def countcpus():
23 '''try to count the number of CPUs on the system'''
22 '''try to count the number of CPUs on the system'''
24
23
25 # posix
24 # posix
26 try:
25 try:
27 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
26 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
28 if n > 0:
27 if n > 0:
29 return n
28 return n
30 except (AttributeError, ValueError):
29 except (AttributeError, ValueError):
31 pass
30 pass
32
31
33 # windows
32 # windows
34 try:
33 try:
35 n = int(os.environ['NUMBER_OF_PROCESSORS'])
34 n = int(os.environ['NUMBER_OF_PROCESSORS'])
36 if n > 0:
35 if n > 0:
37 return n
36 return n
38 except (KeyError, ValueError):
37 except (KeyError, ValueError):
39 pass
38 pass
40
39
41 return 1
40 return 1
42
41
43 def _numworkers(ui):
42 def _numworkers(ui):
44 s = ui.config('worker', 'numcpus')
43 s = ui.config('worker', 'numcpus')
45 if s:
44 if s:
46 try:
45 try:
47 n = int(s)
46 n = int(s)
48 if n >= 1:
47 if n >= 1:
49 return n
48 return n
50 except ValueError:
49 except ValueError:
51 raise error.Abort(_('number of cpus must be an integer'))
50 raise error.Abort(_('number of cpus must be an integer'))
52 return min(max(countcpus(), 4), 32)
51 return min(max(countcpus(), 4), 32)
53
52
54 if os.name == 'posix':
53 if os.name == 'posix':
55 _startupcost = 0.01
54 _startupcost = 0.01
56 else:
55 else:
57 _startupcost = 1e30
56 _startupcost = 1e30
58
57
59 def worthwhile(ui, costperop, nops):
58 def worthwhile(ui, costperop, nops):
60 '''try to determine whether the benefit of multiple processes can
59 '''try to determine whether the benefit of multiple processes can
61 outweigh the cost of starting them'''
60 outweigh the cost of starting them'''
62 linear = costperop * nops
61 linear = costperop * nops
63 workers = _numworkers(ui)
62 workers = _numworkers(ui)
64 benefit = linear - (_startupcost * workers + linear / workers)
63 benefit = linear - (_startupcost * workers + linear / workers)
65 return benefit >= 0.15
64 return benefit >= 0.15
66
65
67 def worker(ui, costperarg, func, staticargs, args):
66 def worker(ui, costperarg, func, staticargs, args):
68 '''run a function, possibly in parallel in multiple worker
67 '''run a function, possibly in parallel in multiple worker
69 processes.
68 processes.
70
69
71 returns a progress iterator
70 returns a progress iterator
72
71
73 costperarg - cost of a single task
72 costperarg - cost of a single task
74
73
75 func - function to run
74 func - function to run
76
75
77 staticargs - arguments to pass to every invocation of the function
76 staticargs - arguments to pass to every invocation of the function
78
77
79 args - arguments to split into chunks, to pass to individual
78 args - arguments to split into chunks, to pass to individual
80 workers
79 workers
81 '''
80 '''
82 if worthwhile(ui, costperarg, len(args)):
81 if worthwhile(ui, costperarg, len(args)):
83 return _platformworker(ui, func, staticargs, args)
82 return _platformworker(ui, func, staticargs, args)
84 return func(*staticargs + (args,))
83 return func(*staticargs + (args,))
85
84
86 def _posixworker(ui, func, staticargs, args):
85 def _posixworker(ui, func, staticargs, args):
87 rfd, wfd = os.pipe()
86 rfd, wfd = os.pipe()
88 workers = _numworkers(ui)
87 workers = _numworkers(ui)
89 oldhandler = signal.getsignal(signal.SIGINT)
88 oldhandler = signal.getsignal(signal.SIGINT)
90 signal.signal(signal.SIGINT, signal.SIG_IGN)
89 signal.signal(signal.SIGINT, signal.SIG_IGN)
91 pids, problem = set(), [0]
90 pids, problem = set(), [0]
92 def killworkers():
91 def killworkers():
93 # if one worker bails, there's no good reason to wait for the rest
92 # if one worker bails, there's no good reason to wait for the rest
94 for p in pids:
93 for p in pids:
95 try:
94 try:
96 os.kill(p, signal.SIGTERM)
95 os.kill(p, signal.SIGTERM)
97 except OSError as err:
96 except OSError as err:
98 if err.errno != errno.ESRCH:
97 if err.errno != errno.ESRCH:
99 raise
98 raise
100 def waitforworkers(blocking=True):
99 def waitforworkers(blocking=True):
101 for pid in pids.copy():
100 for pid in pids.copy():
102 p = st = 0
101 p = st = 0
103 while True:
102 while True:
104 try:
103 try:
105 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
104 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
106 except OSError as e:
105 except OSError as e:
107 if e.errno == errno.EINTR:
106 if e.errno == errno.EINTR:
108 continue
107 continue
109 elif e.errno == errno.ECHILD:
108 elif e.errno == errno.ECHILD:
110 break # ignore ECHILD
109 break # ignore ECHILD
111 else:
110 else:
112 raise
111 raise
113 if p:
112 if p:
114 pids.remove(p)
113 pids.remove(p)
115 st = _exitstatus(st)
114 st = _exitstatus(st)
116 if st and not problem[0]:
115 if st and not problem[0]:
117 problem[0] = st
116 problem[0] = st
118 # unregister SIGCHLD handler as all children will be killed
117 # unregister SIGCHLD handler as all children will be killed
119 signal.signal(signal.SIGCHLD, oldchldhandler)
118 signal.signal(signal.SIGCHLD, oldchldhandler)
120 killworkers()
119 killworkers()
121 def sigchldhandler(signum, frame):
120 def sigchldhandler(signum, frame):
122 waitforworkers(blocking=False)
121 waitforworkers(blocking=False)
123 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
122 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
124 for pargs in partition(args, workers):
123 for pargs in partition(args, workers):
125 pid = os.fork()
124 pid = os.fork()
126 if pid == 0:
125 if pid == 0:
127 signal.signal(signal.SIGINT, oldhandler)
126 signal.signal(signal.SIGINT, oldhandler)
128 signal.signal(signal.SIGCHLD, oldchldhandler)
127 signal.signal(signal.SIGCHLD, oldchldhandler)
129 try:
128 try:
130 os.close(rfd)
129 os.close(rfd)
131 for i, item in func(*(staticargs + (pargs,))):
130 for i, item in func(*(staticargs + (pargs,))):
132 os.write(wfd, '%d %s\n' % (i, item))
131 os.write(wfd, '%d %s\n' % (i, item))
133 os._exit(0)
132 os._exit(0)
134 except KeyboardInterrupt:
133 except KeyboardInterrupt:
135 os._exit(255)
134 os._exit(255)
136 # other exceptions are allowed to propagate, we rely
135 # other exceptions are allowed to propagate, we rely
137 # on lock.py's pid checks to avoid release callbacks
136 # on lock.py's pid checks to avoid release callbacks
138 pids.add(pid)
137 pids.add(pid)
139 os.close(wfd)
138 os.close(wfd)
140 fp = os.fdopen(rfd, 'rb', 0)
139 fp = os.fdopen(rfd, 'rb', 0)
141 t = threading.Thread(target=waitforworkers)
142 t.start()
143 def cleanup():
140 def cleanup():
144 signal.signal(signal.SIGINT, oldhandler)
141 signal.signal(signal.SIGINT, oldhandler)
145 t.join()
142 waitforworkers()
146 signal.signal(signal.SIGCHLD, oldchldhandler)
143 signal.signal(signal.SIGCHLD, oldchldhandler)
147 status = problem[0]
144 status = problem[0]
148 if status:
145 if status:
149 if status < 0:
146 if status < 0:
150 os.kill(os.getpid(), -status)
147 os.kill(os.getpid(), -status)
151 sys.exit(status)
148 sys.exit(status)
152 try:
149 try:
153 for line in util.iterfile(fp):
150 for line in util.iterfile(fp):
154 l = line.split(' ', 1)
151 l = line.split(' ', 1)
155 yield int(l[0]), l[1][:-1]
152 yield int(l[0]), l[1][:-1]
156 except: # re-raises
153 except: # re-raises
157 killworkers()
154 killworkers()
158 cleanup()
155 cleanup()
159 raise
156 raise
160 cleanup()
157 cleanup()
161
158
162 def _posixexitstatus(code):
159 def _posixexitstatus(code):
163 '''convert a posix exit status into the same form returned by
160 '''convert a posix exit status into the same form returned by
164 os.spawnv
161 os.spawnv
165
162
166 returns None if the process was stopped instead of exiting'''
163 returns None if the process was stopped instead of exiting'''
167 if os.WIFEXITED(code):
164 if os.WIFEXITED(code):
168 return os.WEXITSTATUS(code)
165 return os.WEXITSTATUS(code)
169 elif os.WIFSIGNALED(code):
166 elif os.WIFSIGNALED(code):
170 return -os.WTERMSIG(code)
167 return -os.WTERMSIG(code)
171
168
172 if os.name != 'nt':
169 if os.name != 'nt':
173 _platformworker = _posixworker
170 _platformworker = _posixworker
174 _exitstatus = _posixexitstatus
171 _exitstatus = _posixexitstatus
175
172
176 def partition(lst, nslices):
173 def partition(lst, nslices):
177 '''partition a list into N slices of roughly equal size
174 '''partition a list into N slices of roughly equal size
178
175
179 The current strategy takes every Nth element from the input. If
176 The current strategy takes every Nth element from the input. If
180 we ever write workers that need to preserve grouping in input
177 we ever write workers that need to preserve grouping in input
181 we should consider allowing callers to specify a partition strategy.
178 we should consider allowing callers to specify a partition strategy.
182
179
183 mpm is not a fan of this partitioning strategy when files are involved.
180 mpm is not a fan of this partitioning strategy when files are involved.
184 In his words:
181 In his words:
185
182
186 Single-threaded Mercurial makes a point of creating and visiting
183 Single-threaded Mercurial makes a point of creating and visiting
187 files in a fixed order (alphabetical). When creating files in order,
184 files in a fixed order (alphabetical). When creating files in order,
188 a typical filesystem is likely to allocate them on nearby regions on
185 a typical filesystem is likely to allocate them on nearby regions on
189 disk. Thus, when revisiting in the same order, locality is maximized
186 disk. Thus, when revisiting in the same order, locality is maximized
190 and various forms of OS and disk-level caching and read-ahead get a
187 and various forms of OS and disk-level caching and read-ahead get a
191 chance to work.
188 chance to work.
192
189
193 This effect can be quite significant on spinning disks. I discovered it
190 This effect can be quite significant on spinning disks. I discovered it
194 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
191 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
195 Tarring a repo and copying it to another disk effectively randomized
192 Tarring a repo and copying it to another disk effectively randomized
196 the revlog ordering on disk by sorting the revlogs by hash and suddenly
193 the revlog ordering on disk by sorting the revlogs by hash and suddenly
197 performance of my kernel checkout benchmark dropped by ~10x because the
194 performance of my kernel checkout benchmark dropped by ~10x because the
198 "working set" of sectors visited no longer fit in the drive's cache and
195 "working set" of sectors visited no longer fit in the drive's cache and
199 the workload switched from streaming to random I/O.
196 the workload switched from streaming to random I/O.
200
197
201 What we should really be doing is have workers read filenames from a
198 What we should really be doing is have workers read filenames from a
202 ordered queue. This preserves locality and also keeps any worker from
199 ordered queue. This preserves locality and also keeps any worker from
203 getting more than one file out of balance.
200 getting more than one file out of balance.
204 '''
201 '''
205 for i in range(nslices):
202 for i in range(nslices):
206 yield lst[i::nslices]
203 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now