##// END OF EJS Templates
worker: allow waitforworkers to be non-blocking...
Jun Wu -
r30412:7bc25549 default
parent child Browse files
Show More
@@ -1,187 +1,189 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15
15
16 from .i18n import _
16 from .i18n import _
17 from . import (
17 from . import (
18 error,
18 error,
19 util,
19 util,
20 )
20 )
21
21
22 def countcpus():
22 def countcpus():
23 '''try to count the number of CPUs on the system'''
23 '''try to count the number of CPUs on the system'''
24
24
25 # posix
25 # posix
26 try:
26 try:
27 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
27 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
28 if n > 0:
28 if n > 0:
29 return n
29 return n
30 except (AttributeError, ValueError):
30 except (AttributeError, ValueError):
31 pass
31 pass
32
32
33 # windows
33 # windows
34 try:
34 try:
35 n = int(os.environ['NUMBER_OF_PROCESSORS'])
35 n = int(os.environ['NUMBER_OF_PROCESSORS'])
36 if n > 0:
36 if n > 0:
37 return n
37 return n
38 except (KeyError, ValueError):
38 except (KeyError, ValueError):
39 pass
39 pass
40
40
41 return 1
41 return 1
42
42
43 def _numworkers(ui):
43 def _numworkers(ui):
44 s = ui.config('worker', 'numcpus')
44 s = ui.config('worker', 'numcpus')
45 if s:
45 if s:
46 try:
46 try:
47 n = int(s)
47 n = int(s)
48 if n >= 1:
48 if n >= 1:
49 return n
49 return n
50 except ValueError:
50 except ValueError:
51 raise error.Abort(_('number of cpus must be an integer'))
51 raise error.Abort(_('number of cpus must be an integer'))
52 return min(max(countcpus(), 4), 32)
52 return min(max(countcpus(), 4), 32)
53
53
54 if os.name == 'posix':
54 if os.name == 'posix':
55 _startupcost = 0.01
55 _startupcost = 0.01
56 else:
56 else:
57 _startupcost = 1e30
57 _startupcost = 1e30
58
58
59 def worthwhile(ui, costperop, nops):
59 def worthwhile(ui, costperop, nops):
60 '''try to determine whether the benefit of multiple processes can
60 '''try to determine whether the benefit of multiple processes can
61 outweigh the cost of starting them'''
61 outweigh the cost of starting them'''
62 linear = costperop * nops
62 linear = costperop * nops
63 workers = _numworkers(ui)
63 workers = _numworkers(ui)
64 benefit = linear - (_startupcost * workers + linear / workers)
64 benefit = linear - (_startupcost * workers + linear / workers)
65 return benefit >= 0.15
65 return benefit >= 0.15
66
66
67 def worker(ui, costperarg, func, staticargs, args):
67 def worker(ui, costperarg, func, staticargs, args):
68 '''run a function, possibly in parallel in multiple worker
68 '''run a function, possibly in parallel in multiple worker
69 processes.
69 processes.
70
70
71 returns a progress iterator
71 returns a progress iterator
72
72
73 costperarg - cost of a single task
73 costperarg - cost of a single task
74
74
75 func - function to run
75 func - function to run
76
76
77 staticargs - arguments to pass to every invocation of the function
77 staticargs - arguments to pass to every invocation of the function
78
78
79 args - arguments to split into chunks, to pass to individual
79 args - arguments to split into chunks, to pass to individual
80 workers
80 workers
81 '''
81 '''
82 if worthwhile(ui, costperarg, len(args)):
82 if worthwhile(ui, costperarg, len(args)):
83 return _platformworker(ui, func, staticargs, args)
83 return _platformworker(ui, func, staticargs, args)
84 return func(*staticargs + (args,))
84 return func(*staticargs + (args,))
85
85
86 def _posixworker(ui, func, staticargs, args):
86 def _posixworker(ui, func, staticargs, args):
87 rfd, wfd = os.pipe()
87 rfd, wfd = os.pipe()
88 workers = _numworkers(ui)
88 workers = _numworkers(ui)
89 oldhandler = signal.getsignal(signal.SIGINT)
89 oldhandler = signal.getsignal(signal.SIGINT)
90 signal.signal(signal.SIGINT, signal.SIG_IGN)
90 signal.signal(signal.SIGINT, signal.SIG_IGN)
91 pids, problem = [], [0]
91 pids, problem = [], [0]
92 def killworkers():
92 def killworkers():
93 # if one worker bails, there's no good reason to wait for the rest
93 # if one worker bails, there's no good reason to wait for the rest
94 for p in pids:
94 for p in pids:
95 try:
95 try:
96 os.kill(p, signal.SIGTERM)
96 os.kill(p, signal.SIGTERM)
97 except OSError as err:
97 except OSError as err:
98 if err.errno != errno.ESRCH:
98 if err.errno != errno.ESRCH:
99 raise
99 raise
100 def waitforworkers():
100 def waitforworkers(blocking=True):
101 for pid in pids:
101 for pid in pids:
102 st = _exitstatus(os.waitpid(pid, 0)[1])
102 p, st = os.waitpid(pid, 0 if blocking else os.WNOHANG)
103 if p:
104 st = _exitstatus(st)
103 if st and not problem[0]:
105 if st and not problem[0]:
104 problem[0] = st
106 problem[0] = st
105 killworkers()
107 killworkers()
106 for pargs in partition(args, workers):
108 for pargs in partition(args, workers):
107 pid = os.fork()
109 pid = os.fork()
108 if pid == 0:
110 if pid == 0:
109 signal.signal(signal.SIGINT, oldhandler)
111 signal.signal(signal.SIGINT, oldhandler)
110 try:
112 try:
111 os.close(rfd)
113 os.close(rfd)
112 for i, item in func(*(staticargs + (pargs,))):
114 for i, item in func(*(staticargs + (pargs,))):
113 os.write(wfd, '%d %s\n' % (i, item))
115 os.write(wfd, '%d %s\n' % (i, item))
114 os._exit(0)
116 os._exit(0)
115 except KeyboardInterrupt:
117 except KeyboardInterrupt:
116 os._exit(255)
118 os._exit(255)
117 # other exceptions are allowed to propagate, we rely
119 # other exceptions are allowed to propagate, we rely
118 # on lock.py's pid checks to avoid release callbacks
120 # on lock.py's pid checks to avoid release callbacks
119 pids.append(pid)
121 pids.append(pid)
120 pids.reverse()
122 pids.reverse()
121 os.close(wfd)
123 os.close(wfd)
122 fp = os.fdopen(rfd, 'rb', 0)
124 fp = os.fdopen(rfd, 'rb', 0)
123 t = threading.Thread(target=waitforworkers)
125 t = threading.Thread(target=waitforworkers)
124 t.start()
126 t.start()
125 def cleanup():
127 def cleanup():
126 signal.signal(signal.SIGINT, oldhandler)
128 signal.signal(signal.SIGINT, oldhandler)
127 t.join()
129 t.join()
128 status = problem[0]
130 status = problem[0]
129 if status:
131 if status:
130 if status < 0:
132 if status < 0:
131 os.kill(os.getpid(), -status)
133 os.kill(os.getpid(), -status)
132 sys.exit(status)
134 sys.exit(status)
133 try:
135 try:
134 for line in util.iterfile(fp):
136 for line in util.iterfile(fp):
135 l = line.split(' ', 1)
137 l = line.split(' ', 1)
136 yield int(l[0]), l[1][:-1]
138 yield int(l[0]), l[1][:-1]
137 except: # re-raises
139 except: # re-raises
138 killworkers()
140 killworkers()
139 cleanup()
141 cleanup()
140 raise
142 raise
141 cleanup()
143 cleanup()
142
144
143 def _posixexitstatus(code):
145 def _posixexitstatus(code):
144 '''convert a posix exit status into the same form returned by
146 '''convert a posix exit status into the same form returned by
145 os.spawnv
147 os.spawnv
146
148
147 returns None if the process was stopped instead of exiting'''
149 returns None if the process was stopped instead of exiting'''
148 if os.WIFEXITED(code):
150 if os.WIFEXITED(code):
149 return os.WEXITSTATUS(code)
151 return os.WEXITSTATUS(code)
150 elif os.WIFSIGNALED(code):
152 elif os.WIFSIGNALED(code):
151 return -os.WTERMSIG(code)
153 return -os.WTERMSIG(code)
152
154
153 if os.name != 'nt':
155 if os.name != 'nt':
154 _platformworker = _posixworker
156 _platformworker = _posixworker
155 _exitstatus = _posixexitstatus
157 _exitstatus = _posixexitstatus
156
158
157 def partition(lst, nslices):
159 def partition(lst, nslices):
158 '''partition a list into N slices of roughly equal size
160 '''partition a list into N slices of roughly equal size
159
161
160 The current strategy takes every Nth element from the input. If
162 The current strategy takes every Nth element from the input. If
161 we ever write workers that need to preserve grouping in input
163 we ever write workers that need to preserve grouping in input
162 we should consider allowing callers to specify a partition strategy.
164 we should consider allowing callers to specify a partition strategy.
163
165
164 mpm is not a fan of this partitioning strategy when files are involved.
166 mpm is not a fan of this partitioning strategy when files are involved.
165 In his words:
167 In his words:
166
168
167 Single-threaded Mercurial makes a point of creating and visiting
169 Single-threaded Mercurial makes a point of creating and visiting
168 files in a fixed order (alphabetical). When creating files in order,
170 files in a fixed order (alphabetical). When creating files in order,
169 a typical filesystem is likely to allocate them on nearby regions on
171 a typical filesystem is likely to allocate them on nearby regions on
170 disk. Thus, when revisiting in the same order, locality is maximized
172 disk. Thus, when revisiting in the same order, locality is maximized
171 and various forms of OS and disk-level caching and read-ahead get a
173 and various forms of OS and disk-level caching and read-ahead get a
172 chance to work.
174 chance to work.
173
175
174 This effect can be quite significant on spinning disks. I discovered it
176 This effect can be quite significant on spinning disks. I discovered it
175 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
177 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
176 Tarring a repo and copying it to another disk effectively randomized
178 Tarring a repo and copying it to another disk effectively randomized
177 the revlog ordering on disk by sorting the revlogs by hash and suddenly
179 the revlog ordering on disk by sorting the revlogs by hash and suddenly
178 performance of my kernel checkout benchmark dropped by ~10x because the
180 performance of my kernel checkout benchmark dropped by ~10x because the
179 "working set" of sectors visited no longer fit in the drive's cache and
181 "working set" of sectors visited no longer fit in the drive's cache and
180 the workload switched from streaming to random I/O.
182 the workload switched from streaming to random I/O.
181
183
182 What we should really be doing is have workers read filenames from a
184 What we should really be doing is have workers read filenames from a
183 ordered queue. This preserves locality and also keeps any worker from
185 ordered queue. This preserves locality and also keeps any worker from
184 getting more than one file out of balance.
186 getting more than one file out of balance.
185 '''
187 '''
186 for i in range(nslices):
188 for i in range(nslices):
187 yield lst[i::nslices]
189 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now