##// END OF EJS Templates
worker: flush ui buffers before running the worker...
David Soria Parra -
r31696:9d3d56aa default
parent child Browse files
Show More
@@ -0,0 +1,54 b''
1 Test UI worker interaction
2
3 $ cat > t.py <<EOF
4 > from __future__ import absolute_import, print_function
5 > from mercurial import (
6 > cmdutil,
7 > ui as uimod,
8 > worker,
9 > )
10 > def runme(ui, args):
11 > for arg in args:
12 > ui.status('run\n')
13 > yield 1, arg
14 > cmdtable = {}
15 > command = cmdutil.command(cmdtable)
16 > @command('test', [], 'hg test [COST]')
17 > def t(ui, repo, cost=1.0):
18 > cost = float(cost)
19 > ui.status('start\n')
20 > runs = worker.worker(ui, cost, runme, (ui,), range(8))
21 > for n, i in runs:
22 > pass
23 > ui.status('done\n')
24 > EOF
25 $ abspath=`pwd`/t.py
26 $ hg init
27
28 Run tests with worker enable by forcing a heigh cost
29
30 $ hg --config "extensions.t=$abspath" test 100000.0
31 start
32 run
33 run
34 run
35 run
36 run
37 run
38 run
39 run
40 done
41
42 Run tests without worker by forcing a low cost
43
44 $ hg --config "extensions.t=$abspath" test 0.0000001
45 start
46 run
47 run
48 run
49 run
50 run
51 run
52 run
53 run
54 done
@@ -1,231 +1,232 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14
15 15 from .i18n import _
16 16 from . import (
17 17 encoding,
18 18 error,
19 19 pycompat,
20 20 scmutil,
21 21 util,
22 22 )
23 23
24 24 def countcpus():
25 25 '''try to count the number of CPUs on the system'''
26 26
27 27 # posix
28 28 try:
29 29 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
30 30 if n > 0:
31 31 return n
32 32 except (AttributeError, ValueError):
33 33 pass
34 34
35 35 # windows
36 36 try:
37 37 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
38 38 if n > 0:
39 39 return n
40 40 except (KeyError, ValueError):
41 41 pass
42 42
43 43 return 1
44 44
45 45 def _numworkers(ui):
46 46 s = ui.config('worker', 'numcpus')
47 47 if s:
48 48 try:
49 49 n = int(s)
50 50 if n >= 1:
51 51 return n
52 52 except ValueError:
53 53 raise error.Abort(_('number of cpus must be an integer'))
54 54 return min(max(countcpus(), 4), 32)
55 55
56 56 if pycompat.osname == 'posix':
57 57 _startupcost = 0.01
58 58 else:
59 59 _startupcost = 1e30
60 60
61 61 def worthwhile(ui, costperop, nops):
62 62 '''try to determine whether the benefit of multiple processes can
63 63 outweigh the cost of starting them'''
64 64 linear = costperop * nops
65 65 workers = _numworkers(ui)
66 66 benefit = linear - (_startupcost * workers + linear / workers)
67 67 return benefit >= 0.15
68 68
69 69 def worker(ui, costperarg, func, staticargs, args):
70 70 '''run a function, possibly in parallel in multiple worker
71 71 processes.
72 72
73 73 returns a progress iterator
74 74
75 75 costperarg - cost of a single task
76 76
77 77 func - function to run
78 78
79 79 staticargs - arguments to pass to every invocation of the function
80 80
81 81 args - arguments to split into chunks, to pass to individual
82 82 workers
83 83 '''
84 84 if worthwhile(ui, costperarg, len(args)):
85 85 return _platformworker(ui, func, staticargs, args)
86 86 return func(*staticargs + (args,))
87 87
88 88 def _posixworker(ui, func, staticargs, args):
89 89 rfd, wfd = os.pipe()
90 90 workers = _numworkers(ui)
91 91 oldhandler = signal.getsignal(signal.SIGINT)
92 92 signal.signal(signal.SIGINT, signal.SIG_IGN)
93 93 pids, problem = set(), [0]
94 94 def killworkers():
95 95 # unregister SIGCHLD handler as all children will be killed. This
96 96 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
97 97 # could be updated while iterating, which would cause inconsistency.
98 98 signal.signal(signal.SIGCHLD, oldchldhandler)
99 99 # if one worker bails, there's no good reason to wait for the rest
100 100 for p in pids:
101 101 try:
102 102 os.kill(p, signal.SIGTERM)
103 103 except OSError as err:
104 104 if err.errno != errno.ESRCH:
105 105 raise
106 106 def waitforworkers(blocking=True):
107 107 for pid in pids.copy():
108 108 p = st = 0
109 109 while True:
110 110 try:
111 111 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
112 112 break
113 113 except OSError as e:
114 114 if e.errno == errno.EINTR:
115 115 continue
116 116 elif e.errno == errno.ECHILD:
117 117 # child would already be reaped, but pids yet been
118 118 # updated (maybe interrupted just after waitpid)
119 119 pids.discard(pid)
120 120 break
121 121 else:
122 122 raise
123 123 if not p:
124 124 # skip subsequent steps, because child process should
125 125 # be still running in this case
126 126 continue
127 127 pids.discard(p)
128 128 st = _exitstatus(st)
129 129 if st and not problem[0]:
130 130 problem[0] = st
131 131 def sigchldhandler(signum, frame):
132 132 waitforworkers(blocking=False)
133 133 if problem[0]:
134 134 killworkers()
135 135 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
136 ui.flush()
136 137 for pargs in partition(args, workers):
137 138 pid = os.fork()
138 139 if pid == 0:
139 140 signal.signal(signal.SIGINT, oldhandler)
140 141 signal.signal(signal.SIGCHLD, oldchldhandler)
141 142
142 143 def workerfunc():
143 144 os.close(rfd)
144 145 for i, item in func(*(staticargs + (pargs,))):
145 146 os.write(wfd, '%d %s\n' % (i, item))
146 147
147 148 # make sure we use os._exit in all code paths. otherwise the worker
148 149 # may do some clean-ups which could cause surprises like deadlock.
149 150 # see sshpeer.cleanup for example.
150 151 try:
151 152 try:
152 153 scmutil.callcatch(ui, workerfunc)
153 154 finally:
154 155 ui.flush()
155 156 except KeyboardInterrupt:
156 157 os._exit(255)
157 158 except: # never return, therefore no re-raises
158 159 try:
159 160 ui.traceback()
160 161 ui.flush()
161 162 finally:
162 163 os._exit(255)
163 164 else:
164 165 os._exit(0)
165 166 pids.add(pid)
166 167 os.close(wfd)
167 168 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
168 169 def cleanup():
169 170 signal.signal(signal.SIGINT, oldhandler)
170 171 waitforworkers()
171 172 signal.signal(signal.SIGCHLD, oldchldhandler)
172 173 status = problem[0]
173 174 if status:
174 175 if status < 0:
175 176 os.kill(os.getpid(), -status)
176 177 sys.exit(status)
177 178 try:
178 179 for line in util.iterfile(fp):
179 180 l = line.split(' ', 1)
180 181 yield int(l[0]), l[1][:-1]
181 182 except: # re-raises
182 183 killworkers()
183 184 cleanup()
184 185 raise
185 186 cleanup()
186 187
187 188 def _posixexitstatus(code):
188 189 '''convert a posix exit status into the same form returned by
189 190 os.spawnv
190 191
191 192 returns None if the process was stopped instead of exiting'''
192 193 if os.WIFEXITED(code):
193 194 return os.WEXITSTATUS(code)
194 195 elif os.WIFSIGNALED(code):
195 196 return -os.WTERMSIG(code)
196 197
197 198 if pycompat.osname != 'nt':
198 199 _platformworker = _posixworker
199 200 _exitstatus = _posixexitstatus
200 201
201 202 def partition(lst, nslices):
202 203 '''partition a list into N slices of roughly equal size
203 204
204 205 The current strategy takes every Nth element from the input. If
205 206 we ever write workers that need to preserve grouping in input
206 207 we should consider allowing callers to specify a partition strategy.
207 208
208 209 mpm is not a fan of this partitioning strategy when files are involved.
209 210 In his words:
210 211
211 212 Single-threaded Mercurial makes a point of creating and visiting
212 213 files in a fixed order (alphabetical). When creating files in order,
213 214 a typical filesystem is likely to allocate them on nearby regions on
214 215 disk. Thus, when revisiting in the same order, locality is maximized
215 216 and various forms of OS and disk-level caching and read-ahead get a
216 217 chance to work.
217 218
218 219 This effect can be quite significant on spinning disks. I discovered it
219 220 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
220 221 Tarring a repo and copying it to another disk effectively randomized
221 222 the revlog ordering on disk by sorting the revlogs by hash and suddenly
222 223 performance of my kernel checkout benchmark dropped by ~10x because the
223 224 "working set" of sectors visited no longer fit in the drive's cache and
224 225 the workload switched from streaming to random I/O.
225 226
226 227 What we should really be doing is have workers read filenames from a
227 228 ordered queue. This preserves locality and also keeps any worker from
228 229 getting more than one file out of balance.
229 230 '''
230 231 for i in range(nslices):
231 232 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now