##// END OF EJS Templates
worker: make waitforworkers reentrant...
Jun Wu -
r30414:5069a8a4 default
parent child Browse files
Show More
@@ -1,188 +1,199 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14 import threading
15 15
16 16 from .i18n import _
17 17 from . import (
18 18 error,
19 19 util,
20 20 )
21 21
22 22 def countcpus():
23 23 '''try to count the number of CPUs on the system'''
24 24
25 25 # posix
26 26 try:
27 27 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
28 28 if n > 0:
29 29 return n
30 30 except (AttributeError, ValueError):
31 31 pass
32 32
33 33 # windows
34 34 try:
35 35 n = int(os.environ['NUMBER_OF_PROCESSORS'])
36 36 if n > 0:
37 37 return n
38 38 except (KeyError, ValueError):
39 39 pass
40 40
41 41 return 1
42 42
43 43 def _numworkers(ui):
44 44 s = ui.config('worker', 'numcpus')
45 45 if s:
46 46 try:
47 47 n = int(s)
48 48 if n >= 1:
49 49 return n
50 50 except ValueError:
51 51 raise error.Abort(_('number of cpus must be an integer'))
52 52 return min(max(countcpus(), 4), 32)
53 53
54 54 if os.name == 'posix':
55 55 _startupcost = 0.01
56 56 else:
57 57 _startupcost = 1e30
58 58
59 59 def worthwhile(ui, costperop, nops):
60 60 '''try to determine whether the benefit of multiple processes can
61 61 outweigh the cost of starting them'''
62 62 linear = costperop * nops
63 63 workers = _numworkers(ui)
64 64 benefit = linear - (_startupcost * workers + linear / workers)
65 65 return benefit >= 0.15
66 66
67 67 def worker(ui, costperarg, func, staticargs, args):
68 68 '''run a function, possibly in parallel in multiple worker
69 69 processes.
70 70
71 71 returns a progress iterator
72 72
73 73 costperarg - cost of a single task
74 74
75 75 func - function to run
76 76
77 77 staticargs - arguments to pass to every invocation of the function
78 78
79 79 args - arguments to split into chunks, to pass to individual
80 80 workers
81 81 '''
82 82 if worthwhile(ui, costperarg, len(args)):
83 83 return _platformworker(ui, func, staticargs, args)
84 84 return func(*staticargs + (args,))
85 85
86 86 def _posixworker(ui, func, staticargs, args):
87 87 rfd, wfd = os.pipe()
88 88 workers = _numworkers(ui)
89 89 oldhandler = signal.getsignal(signal.SIGINT)
90 90 signal.signal(signal.SIGINT, signal.SIG_IGN)
91 91 pids, problem = set(), [0]
92 92 def killworkers():
93 93 # if one worker bails, there's no good reason to wait for the rest
94 94 for p in pids:
95 95 try:
96 96 os.kill(p, signal.SIGTERM)
97 97 except OSError as err:
98 98 if err.errno != errno.ESRCH:
99 99 raise
100 100 def waitforworkers(blocking=True):
101 for pid in pids:
102 p, st = os.waitpid(pid, 0 if blocking else os.WNOHANG)
101 for pid in pids.copy():
102 p = st = 0
103 while True:
104 try:
105 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
106 except OSError as e:
107 if e.errno == errno.EINTR:
108 continue
109 elif e.errno == errno.ECHILD:
110 break # ignore ECHILD
111 else:
112 raise
103 113 if p:
114 pids.remove(p)
104 115 st = _exitstatus(st)
105 116 if st and not problem[0]:
106 117 problem[0] = st
107 118 killworkers()
108 119 for pargs in partition(args, workers):
109 120 pid = os.fork()
110 121 if pid == 0:
111 122 signal.signal(signal.SIGINT, oldhandler)
112 123 try:
113 124 os.close(rfd)
114 125 for i, item in func(*(staticargs + (pargs,))):
115 126 os.write(wfd, '%d %s\n' % (i, item))
116 127 os._exit(0)
117 128 except KeyboardInterrupt:
118 129 os._exit(255)
119 130 # other exceptions are allowed to propagate, we rely
120 131 # on lock.py's pid checks to avoid release callbacks
121 132 pids.add(pid)
122 133 os.close(wfd)
123 134 fp = os.fdopen(rfd, 'rb', 0)
124 135 t = threading.Thread(target=waitforworkers)
125 136 t.start()
126 137 def cleanup():
127 138 signal.signal(signal.SIGINT, oldhandler)
128 139 t.join()
129 140 status = problem[0]
130 141 if status:
131 142 if status < 0:
132 143 os.kill(os.getpid(), -status)
133 144 sys.exit(status)
134 145 try:
135 146 for line in util.iterfile(fp):
136 147 l = line.split(' ', 1)
137 148 yield int(l[0]), l[1][:-1]
138 149 except: # re-raises
139 150 killworkers()
140 151 cleanup()
141 152 raise
142 153 cleanup()
143 154
144 155 def _posixexitstatus(code):
145 156 '''convert a posix exit status into the same form returned by
146 157 os.spawnv
147 158
148 159 returns None if the process was stopped instead of exiting'''
149 160 if os.WIFEXITED(code):
150 161 return os.WEXITSTATUS(code)
151 162 elif os.WIFSIGNALED(code):
152 163 return -os.WTERMSIG(code)
153 164
154 165 if os.name != 'nt':
155 166 _platformworker = _posixworker
156 167 _exitstatus = _posixexitstatus
157 168
158 169 def partition(lst, nslices):
159 170 '''partition a list into N slices of roughly equal size
160 171
161 172 The current strategy takes every Nth element from the input. If
162 173 we ever write workers that need to preserve grouping in input
163 174 we should consider allowing callers to specify a partition strategy.
164 175
165 176 mpm is not a fan of this partitioning strategy when files are involved.
166 177 In his words:
167 178
168 179 Single-threaded Mercurial makes a point of creating and visiting
169 180 files in a fixed order (alphabetical). When creating files in order,
170 181 a typical filesystem is likely to allocate them on nearby regions on
171 182 disk. Thus, when revisiting in the same order, locality is maximized
172 183 and various forms of OS and disk-level caching and read-ahead get a
173 184 chance to work.
174 185
175 186 This effect can be quite significant on spinning disks. I discovered it
176 187 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
177 188 Tarring a repo and copying it to another disk effectively randomized
178 189 the revlog ordering on disk by sorting the revlogs by hash and suddenly
179 190 performance of my kernel checkout benchmark dropped by ~10x because the
180 191 "working set" of sectors visited no longer fit in the drive's cache and
181 192 the workload switched from streaming to random I/O.
182 193
183 194 What we should really be doing is have workers read filenames from a
184 195 ordered queue. This preserves locality and also keeps any worker from
185 196 getting more than one file out of balance.
186 197 '''
187 198 for i in range(nslices):
188 199 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now