##// END OF EJS Templates
worker: move killworkers and waitforworkers up...
Jun Wu -
r30410:7a5d6e2f default
parent child Browse files
Show More
@@ -1,187 +1,187
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14 import threading
15 15
16 16 from .i18n import _
17 17 from . import (
18 18 error,
19 19 util,
20 20 )
21 21
22 22 def countcpus():
23 23 '''try to count the number of CPUs on the system'''
24 24
25 25 # posix
26 26 try:
27 27 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
28 28 if n > 0:
29 29 return n
30 30 except (AttributeError, ValueError):
31 31 pass
32 32
33 33 # windows
34 34 try:
35 35 n = int(os.environ['NUMBER_OF_PROCESSORS'])
36 36 if n > 0:
37 37 return n
38 38 except (KeyError, ValueError):
39 39 pass
40 40
41 41 return 1
42 42
43 43 def _numworkers(ui):
44 44 s = ui.config('worker', 'numcpus')
45 45 if s:
46 46 try:
47 47 n = int(s)
48 48 if n >= 1:
49 49 return n
50 50 except ValueError:
51 51 raise error.Abort(_('number of cpus must be an integer'))
52 52 return min(max(countcpus(), 4), 32)
53 53
54 54 if os.name == 'posix':
55 55 _startupcost = 0.01
56 56 else:
57 57 _startupcost = 1e30
58 58
59 59 def worthwhile(ui, costperop, nops):
60 60 '''try to determine whether the benefit of multiple processes can
61 61 outweigh the cost of starting them'''
62 62 linear = costperop * nops
63 63 workers = _numworkers(ui)
64 64 benefit = linear - (_startupcost * workers + linear / workers)
65 65 return benefit >= 0.15
66 66
67 67 def worker(ui, costperarg, func, staticargs, args):
68 68 '''run a function, possibly in parallel in multiple worker
69 69 processes.
70 70
71 71 returns a progress iterator
72 72
73 73 costperarg - cost of a single task
74 74
75 75 func - function to run
76 76
77 77 staticargs - arguments to pass to every invocation of the function
78 78
79 79 args - arguments to split into chunks, to pass to individual
80 80 workers
81 81 '''
82 82 if worthwhile(ui, costperarg, len(args)):
83 83 return _platformworker(ui, func, staticargs, args)
84 84 return func(*staticargs + (args,))
85 85
86 86 def _posixworker(ui, func, staticargs, args):
87 87 rfd, wfd = os.pipe()
88 88 workers = _numworkers(ui)
89 89 oldhandler = signal.getsignal(signal.SIGINT)
90 90 signal.signal(signal.SIGINT, signal.SIG_IGN)
91 91 pids, problem = [], [0]
92 def killworkers():
93 # if one worker bails, there's no good reason to wait for the rest
94 for p in pids:
95 try:
96 os.kill(p, signal.SIGTERM)
97 except OSError as err:
98 if err.errno != errno.ESRCH:
99 raise
100 def waitforworkers():
101 for _pid in pids:
102 st = _exitstatus(os.wait()[1])
103 if st and not problem[0]:
104 problem[0] = st
105 killworkers()
92 106 for pargs in partition(args, workers):
93 107 pid = os.fork()
94 108 if pid == 0:
95 109 signal.signal(signal.SIGINT, oldhandler)
96 110 try:
97 111 os.close(rfd)
98 112 for i, item in func(*(staticargs + (pargs,))):
99 113 os.write(wfd, '%d %s\n' % (i, item))
100 114 os._exit(0)
101 115 except KeyboardInterrupt:
102 116 os._exit(255)
103 117 # other exceptions are allowed to propagate, we rely
104 118 # on lock.py's pid checks to avoid release callbacks
105 119 pids.append(pid)
106 120 pids.reverse()
107 121 os.close(wfd)
108 122 fp = os.fdopen(rfd, 'rb', 0)
109 def killworkers():
110 # if one worker bails, there's no good reason to wait for the rest
111 for p in pids:
112 try:
113 os.kill(p, signal.SIGTERM)
114 except OSError as err:
115 if err.errno != errno.ESRCH:
116 raise
117 def waitforworkers():
118 for _pid in pids:
119 st = _exitstatus(os.wait()[1])
120 if st and not problem[0]:
121 problem[0] = st
122 killworkers()
123 123 t = threading.Thread(target=waitforworkers)
124 124 t.start()
125 125 def cleanup():
126 126 signal.signal(signal.SIGINT, oldhandler)
127 127 t.join()
128 128 status = problem[0]
129 129 if status:
130 130 if status < 0:
131 131 os.kill(os.getpid(), -status)
132 132 sys.exit(status)
133 133 try:
134 134 for line in util.iterfile(fp):
135 135 l = line.split(' ', 1)
136 136 yield int(l[0]), l[1][:-1]
137 137 except: # re-raises
138 138 killworkers()
139 139 cleanup()
140 140 raise
141 141 cleanup()
142 142
143 143 def _posixexitstatus(code):
144 144 '''convert a posix exit status into the same form returned by
145 145 os.spawnv
146 146
147 147 returns None if the process was stopped instead of exiting'''
148 148 if os.WIFEXITED(code):
149 149 return os.WEXITSTATUS(code)
150 150 elif os.WIFSIGNALED(code):
151 151 return -os.WTERMSIG(code)
152 152
153 153 if os.name != 'nt':
154 154 _platformworker = _posixworker
155 155 _exitstatus = _posixexitstatus
156 156
157 157 def partition(lst, nslices):
158 158 '''partition a list into N slices of roughly equal size
159 159
160 160 The current strategy takes every Nth element from the input. If
161 161 we ever write workers that need to preserve grouping in input
162 162 we should consider allowing callers to specify a partition strategy.
163 163
164 164 mpm is not a fan of this partitioning strategy when files are involved.
165 165 In his words:
166 166
167 167 Single-threaded Mercurial makes a point of creating and visiting
168 168 files in a fixed order (alphabetical). When creating files in order,
169 169 a typical filesystem is likely to allocate them on nearby regions on
170 170 disk. Thus, when revisiting in the same order, locality is maximized
171 171 and various forms of OS and disk-level caching and read-ahead get a
172 172 chance to work.
173 173
174 174 This effect can be quite significant on spinning disks. I discovered it
175 175 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
176 176 Tarring a repo and copying it to another disk effectively randomized
177 177 the revlog ordering on disk by sorting the revlogs by hash and suddenly
178 178 performance of my kernel checkout benchmark dropped by ~10x because the
179 179 "working set" of sectors visited no longer fit in the drive's cache and
180 180 the workload switched from streaming to random I/O.
181 181
182 182 What we should really be doing is have workers read filenames from a
183 183 ordered queue. This preserves locality and also keeps any worker from
184 184 getting more than one file out of balance.
185 185 '''
186 186 for i in range(nslices):
187 187 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now