##// END OF EJS Templates
worker: make waitforworkers reentrant...
Jun Wu -
r30414:5069a8a4 default
parent child Browse files
Show More
@@ -1,188 +1,199 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15
15
16 from .i18n import _
16 from .i18n import _
17 from . import (
17 from . import (
18 error,
18 error,
19 util,
19 util,
20 )
20 )
21
21
22 def countcpus():
22 def countcpus():
23 '''try to count the number of CPUs on the system'''
23 '''try to count the number of CPUs on the system'''
24
24
25 # posix
25 # posix
26 try:
26 try:
27 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
27 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
28 if n > 0:
28 if n > 0:
29 return n
29 return n
30 except (AttributeError, ValueError):
30 except (AttributeError, ValueError):
31 pass
31 pass
32
32
33 # windows
33 # windows
34 try:
34 try:
35 n = int(os.environ['NUMBER_OF_PROCESSORS'])
35 n = int(os.environ['NUMBER_OF_PROCESSORS'])
36 if n > 0:
36 if n > 0:
37 return n
37 return n
38 except (KeyError, ValueError):
38 except (KeyError, ValueError):
39 pass
39 pass
40
40
41 return 1
41 return 1
42
42
43 def _numworkers(ui):
43 def _numworkers(ui):
44 s = ui.config('worker', 'numcpus')
44 s = ui.config('worker', 'numcpus')
45 if s:
45 if s:
46 try:
46 try:
47 n = int(s)
47 n = int(s)
48 if n >= 1:
48 if n >= 1:
49 return n
49 return n
50 except ValueError:
50 except ValueError:
51 raise error.Abort(_('number of cpus must be an integer'))
51 raise error.Abort(_('number of cpus must be an integer'))
52 return min(max(countcpus(), 4), 32)
52 return min(max(countcpus(), 4), 32)
53
53
54 if os.name == 'posix':
54 if os.name == 'posix':
55 _startupcost = 0.01
55 _startupcost = 0.01
56 else:
56 else:
57 _startupcost = 1e30
57 _startupcost = 1e30
58
58
59 def worthwhile(ui, costperop, nops):
59 def worthwhile(ui, costperop, nops):
60 '''try to determine whether the benefit of multiple processes can
60 '''try to determine whether the benefit of multiple processes can
61 outweigh the cost of starting them'''
61 outweigh the cost of starting them'''
62 linear = costperop * nops
62 linear = costperop * nops
63 workers = _numworkers(ui)
63 workers = _numworkers(ui)
64 benefit = linear - (_startupcost * workers + linear / workers)
64 benefit = linear - (_startupcost * workers + linear / workers)
65 return benefit >= 0.15
65 return benefit >= 0.15
66
66
67 def worker(ui, costperarg, func, staticargs, args):
67 def worker(ui, costperarg, func, staticargs, args):
68 '''run a function, possibly in parallel in multiple worker
68 '''run a function, possibly in parallel in multiple worker
69 processes.
69 processes.
70
70
71 returns a progress iterator
71 returns a progress iterator
72
72
73 costperarg - cost of a single task
73 costperarg - cost of a single task
74
74
75 func - function to run
75 func - function to run
76
76
77 staticargs - arguments to pass to every invocation of the function
77 staticargs - arguments to pass to every invocation of the function
78
78
79 args - arguments to split into chunks, to pass to individual
79 args - arguments to split into chunks, to pass to individual
80 workers
80 workers
81 '''
81 '''
82 if worthwhile(ui, costperarg, len(args)):
82 if worthwhile(ui, costperarg, len(args)):
83 return _platformworker(ui, func, staticargs, args)
83 return _platformworker(ui, func, staticargs, args)
84 return func(*staticargs + (args,))
84 return func(*staticargs + (args,))
85
85
86 def _posixworker(ui, func, staticargs, args):
86 def _posixworker(ui, func, staticargs, args):
87 rfd, wfd = os.pipe()
87 rfd, wfd = os.pipe()
88 workers = _numworkers(ui)
88 workers = _numworkers(ui)
89 oldhandler = signal.getsignal(signal.SIGINT)
89 oldhandler = signal.getsignal(signal.SIGINT)
90 signal.signal(signal.SIGINT, signal.SIG_IGN)
90 signal.signal(signal.SIGINT, signal.SIG_IGN)
91 pids, problem = set(), [0]
91 pids, problem = set(), [0]
92 def killworkers():
92 def killworkers():
93 # if one worker bails, there's no good reason to wait for the rest
93 # if one worker bails, there's no good reason to wait for the rest
94 for p in pids:
94 for p in pids:
95 try:
95 try:
96 os.kill(p, signal.SIGTERM)
96 os.kill(p, signal.SIGTERM)
97 except OSError as err:
97 except OSError as err:
98 if err.errno != errno.ESRCH:
98 if err.errno != errno.ESRCH:
99 raise
99 raise
100 def waitforworkers(blocking=True):
100 def waitforworkers(blocking=True):
101 for pid in pids:
101 for pid in pids.copy():
102 p, st = os.waitpid(pid, 0 if blocking else os.WNOHANG)
102 p = st = 0
103 while True:
104 try:
105 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
106 except OSError as e:
107 if e.errno == errno.EINTR:
108 continue
109 elif e.errno == errno.ECHILD:
110 break # ignore ECHILD
111 else:
112 raise
103 if p:
113 if p:
114 pids.remove(p)
104 st = _exitstatus(st)
115 st = _exitstatus(st)
105 if st and not problem[0]:
116 if st and not problem[0]:
106 problem[0] = st
117 problem[0] = st
107 killworkers()
118 killworkers()
108 for pargs in partition(args, workers):
119 for pargs in partition(args, workers):
109 pid = os.fork()
120 pid = os.fork()
110 if pid == 0:
121 if pid == 0:
111 signal.signal(signal.SIGINT, oldhandler)
122 signal.signal(signal.SIGINT, oldhandler)
112 try:
123 try:
113 os.close(rfd)
124 os.close(rfd)
114 for i, item in func(*(staticargs + (pargs,))):
125 for i, item in func(*(staticargs + (pargs,))):
115 os.write(wfd, '%d %s\n' % (i, item))
126 os.write(wfd, '%d %s\n' % (i, item))
116 os._exit(0)
127 os._exit(0)
117 except KeyboardInterrupt:
128 except KeyboardInterrupt:
118 os._exit(255)
129 os._exit(255)
119 # other exceptions are allowed to propagate, we rely
130 # other exceptions are allowed to propagate, we rely
120 # on lock.py's pid checks to avoid release callbacks
131 # on lock.py's pid checks to avoid release callbacks
121 pids.add(pid)
132 pids.add(pid)
122 os.close(wfd)
133 os.close(wfd)
123 fp = os.fdopen(rfd, 'rb', 0)
134 fp = os.fdopen(rfd, 'rb', 0)
124 t = threading.Thread(target=waitforworkers)
135 t = threading.Thread(target=waitforworkers)
125 t.start()
136 t.start()
126 def cleanup():
137 def cleanup():
127 signal.signal(signal.SIGINT, oldhandler)
138 signal.signal(signal.SIGINT, oldhandler)
128 t.join()
139 t.join()
129 status = problem[0]
140 status = problem[0]
130 if status:
141 if status:
131 if status < 0:
142 if status < 0:
132 os.kill(os.getpid(), -status)
143 os.kill(os.getpid(), -status)
133 sys.exit(status)
144 sys.exit(status)
134 try:
145 try:
135 for line in util.iterfile(fp):
146 for line in util.iterfile(fp):
136 l = line.split(' ', 1)
147 l = line.split(' ', 1)
137 yield int(l[0]), l[1][:-1]
148 yield int(l[0]), l[1][:-1]
138 except: # re-raises
149 except: # re-raises
139 killworkers()
150 killworkers()
140 cleanup()
151 cleanup()
141 raise
152 raise
142 cleanup()
153 cleanup()
143
154
144 def _posixexitstatus(code):
155 def _posixexitstatus(code):
145 '''convert a posix exit status into the same form returned by
156 '''convert a posix exit status into the same form returned by
146 os.spawnv
157 os.spawnv
147
158
148 returns None if the process was stopped instead of exiting'''
159 returns None if the process was stopped instead of exiting'''
149 if os.WIFEXITED(code):
160 if os.WIFEXITED(code):
150 return os.WEXITSTATUS(code)
161 return os.WEXITSTATUS(code)
151 elif os.WIFSIGNALED(code):
162 elif os.WIFSIGNALED(code):
152 return -os.WTERMSIG(code)
163 return -os.WTERMSIG(code)
153
164
154 if os.name != 'nt':
165 if os.name != 'nt':
155 _platformworker = _posixworker
166 _platformworker = _posixworker
156 _exitstatus = _posixexitstatus
167 _exitstatus = _posixexitstatus
157
168
158 def partition(lst, nslices):
169 def partition(lst, nslices):
159 '''partition a list into N slices of roughly equal size
170 '''partition a list into N slices of roughly equal size
160
171
161 The current strategy takes every Nth element from the input. If
172 The current strategy takes every Nth element from the input. If
162 we ever write workers that need to preserve grouping in input
173 we ever write workers that need to preserve grouping in input
163 we should consider allowing callers to specify a partition strategy.
174 we should consider allowing callers to specify a partition strategy.
164
175
165 mpm is not a fan of this partitioning strategy when files are involved.
176 mpm is not a fan of this partitioning strategy when files are involved.
166 In his words:
177 In his words:
167
178
168 Single-threaded Mercurial makes a point of creating and visiting
179 Single-threaded Mercurial makes a point of creating and visiting
169 files in a fixed order (alphabetical). When creating files in order,
180 files in a fixed order (alphabetical). When creating files in order,
170 a typical filesystem is likely to allocate them on nearby regions on
181 a typical filesystem is likely to allocate them on nearby regions on
171 disk. Thus, when revisiting in the same order, locality is maximized
182 disk. Thus, when revisiting in the same order, locality is maximized
172 and various forms of OS and disk-level caching and read-ahead get a
183 and various forms of OS and disk-level caching and read-ahead get a
173 chance to work.
184 chance to work.
174
185
175 This effect can be quite significant on spinning disks. I discovered it
186 This effect can be quite significant on spinning disks. I discovered it
176 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
187 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
177 Tarring a repo and copying it to another disk effectively randomized
188 Tarring a repo and copying it to another disk effectively randomized
178 the revlog ordering on disk by sorting the revlogs by hash and suddenly
189 the revlog ordering on disk by sorting the revlogs by hash and suddenly
179 performance of my kernel checkout benchmark dropped by ~10x because the
190 performance of my kernel checkout benchmark dropped by ~10x because the
180 "working set" of sectors visited no longer fit in the drive's cache and
191 "working set" of sectors visited no longer fit in the drive's cache and
181 the workload switched from streaming to random I/O.
192 the workload switched from streaming to random I/O.
182
193
183 What we should really be doing is have workers read filenames from a
194 What we should really be doing is have workers read filenames from a
184 ordered queue. This preserves locality and also keeps any worker from
195 ordered queue. This preserves locality and also keeps any worker from
185 getting more than one file out of balance.
196 getting more than one file out of balance.
186 '''
197 '''
187 for i in range(nslices):
198 for i in range(nslices):
188 yield lst[i::nslices]
199 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now