##// END OF EJS Templates
worker: flush ui buffers before running the worker...
David Soria Parra -
r31696:9d3d56aa default
parent child Browse files
Show More
@@ -0,0 +1,54 b''
1 Test UI worker interaction
2
3 $ cat > t.py <<EOF
4 > from __future__ import absolute_import, print_function
5 > from mercurial import (
6 > cmdutil,
7 > ui as uimod,
8 > worker,
9 > )
10 > def runme(ui, args):
11 > for arg in args:
12 > ui.status('run\n')
13 > yield 1, arg
14 > cmdtable = {}
15 > command = cmdutil.command(cmdtable)
16 > @command('test', [], 'hg test [COST]')
17 > def t(ui, repo, cost=1.0):
18 > cost = float(cost)
19 > ui.status('start\n')
20 > runs = worker.worker(ui, cost, runme, (ui,), range(8))
21 > for n, i in runs:
22 > pass
23 > ui.status('done\n')
24 > EOF
25 $ abspath=`pwd`/t.py
26 $ hg init
27
28 Run tests with worker enable by forcing a heigh cost
29
30 $ hg --config "extensions.t=$abspath" test 100000.0
31 start
32 run
33 run
34 run
35 run
36 run
37 run
38 run
39 run
40 done
41
42 Run tests without worker by forcing a low cost
43
44 $ hg --config "extensions.t=$abspath" test 0.0000001
45 start
46 run
47 run
48 run
49 run
50 run
51 run
52 run
53 run
54 done
@@ -1,231 +1,232 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14
14
15 from .i18n import _
15 from .i18n import _
16 from . import (
16 from . import (
17 encoding,
17 encoding,
18 error,
18 error,
19 pycompat,
19 pycompat,
20 scmutil,
20 scmutil,
21 util,
21 util,
22 )
22 )
23
23
24 def countcpus():
24 def countcpus():
25 '''try to count the number of CPUs on the system'''
25 '''try to count the number of CPUs on the system'''
26
26
27 # posix
27 # posix
28 try:
28 try:
29 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
29 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
30 if n > 0:
30 if n > 0:
31 return n
31 return n
32 except (AttributeError, ValueError):
32 except (AttributeError, ValueError):
33 pass
33 pass
34
34
35 # windows
35 # windows
36 try:
36 try:
37 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
37 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
38 if n > 0:
38 if n > 0:
39 return n
39 return n
40 except (KeyError, ValueError):
40 except (KeyError, ValueError):
41 pass
41 pass
42
42
43 return 1
43 return 1
44
44
45 def _numworkers(ui):
45 def _numworkers(ui):
46 s = ui.config('worker', 'numcpus')
46 s = ui.config('worker', 'numcpus')
47 if s:
47 if s:
48 try:
48 try:
49 n = int(s)
49 n = int(s)
50 if n >= 1:
50 if n >= 1:
51 return n
51 return n
52 except ValueError:
52 except ValueError:
53 raise error.Abort(_('number of cpus must be an integer'))
53 raise error.Abort(_('number of cpus must be an integer'))
54 return min(max(countcpus(), 4), 32)
54 return min(max(countcpus(), 4), 32)
55
55
56 if pycompat.osname == 'posix':
56 if pycompat.osname == 'posix':
57 _startupcost = 0.01
57 _startupcost = 0.01
58 else:
58 else:
59 _startupcost = 1e30
59 _startupcost = 1e30
60
60
61 def worthwhile(ui, costperop, nops):
61 def worthwhile(ui, costperop, nops):
62 '''try to determine whether the benefit of multiple processes can
62 '''try to determine whether the benefit of multiple processes can
63 outweigh the cost of starting them'''
63 outweigh the cost of starting them'''
64 linear = costperop * nops
64 linear = costperop * nops
65 workers = _numworkers(ui)
65 workers = _numworkers(ui)
66 benefit = linear - (_startupcost * workers + linear / workers)
66 benefit = linear - (_startupcost * workers + linear / workers)
67 return benefit >= 0.15
67 return benefit >= 0.15
68
68
69 def worker(ui, costperarg, func, staticargs, args):
69 def worker(ui, costperarg, func, staticargs, args):
70 '''run a function, possibly in parallel in multiple worker
70 '''run a function, possibly in parallel in multiple worker
71 processes.
71 processes.
72
72
73 returns a progress iterator
73 returns a progress iterator
74
74
75 costperarg - cost of a single task
75 costperarg - cost of a single task
76
76
77 func - function to run
77 func - function to run
78
78
79 staticargs - arguments to pass to every invocation of the function
79 staticargs - arguments to pass to every invocation of the function
80
80
81 args - arguments to split into chunks, to pass to individual
81 args - arguments to split into chunks, to pass to individual
82 workers
82 workers
83 '''
83 '''
84 if worthwhile(ui, costperarg, len(args)):
84 if worthwhile(ui, costperarg, len(args)):
85 return _platformworker(ui, func, staticargs, args)
85 return _platformworker(ui, func, staticargs, args)
86 return func(*staticargs + (args,))
86 return func(*staticargs + (args,))
87
87
88 def _posixworker(ui, func, staticargs, args):
88 def _posixworker(ui, func, staticargs, args):
89 rfd, wfd = os.pipe()
89 rfd, wfd = os.pipe()
90 workers = _numworkers(ui)
90 workers = _numworkers(ui)
91 oldhandler = signal.getsignal(signal.SIGINT)
91 oldhandler = signal.getsignal(signal.SIGINT)
92 signal.signal(signal.SIGINT, signal.SIG_IGN)
92 signal.signal(signal.SIGINT, signal.SIG_IGN)
93 pids, problem = set(), [0]
93 pids, problem = set(), [0]
94 def killworkers():
94 def killworkers():
95 # unregister SIGCHLD handler as all children will be killed. This
95 # unregister SIGCHLD handler as all children will be killed. This
96 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
96 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
97 # could be updated while iterating, which would cause inconsistency.
97 # could be updated while iterating, which would cause inconsistency.
98 signal.signal(signal.SIGCHLD, oldchldhandler)
98 signal.signal(signal.SIGCHLD, oldchldhandler)
99 # if one worker bails, there's no good reason to wait for the rest
99 # if one worker bails, there's no good reason to wait for the rest
100 for p in pids:
100 for p in pids:
101 try:
101 try:
102 os.kill(p, signal.SIGTERM)
102 os.kill(p, signal.SIGTERM)
103 except OSError as err:
103 except OSError as err:
104 if err.errno != errno.ESRCH:
104 if err.errno != errno.ESRCH:
105 raise
105 raise
106 def waitforworkers(blocking=True):
106 def waitforworkers(blocking=True):
107 for pid in pids.copy():
107 for pid in pids.copy():
108 p = st = 0
108 p = st = 0
109 while True:
109 while True:
110 try:
110 try:
111 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
111 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
112 break
112 break
113 except OSError as e:
113 except OSError as e:
114 if e.errno == errno.EINTR:
114 if e.errno == errno.EINTR:
115 continue
115 continue
116 elif e.errno == errno.ECHILD:
116 elif e.errno == errno.ECHILD:
117 # child would already be reaped, but pids yet been
117 # child would already be reaped, but pids yet been
118 # updated (maybe interrupted just after waitpid)
118 # updated (maybe interrupted just after waitpid)
119 pids.discard(pid)
119 pids.discard(pid)
120 break
120 break
121 else:
121 else:
122 raise
122 raise
123 if not p:
123 if not p:
124 # skip subsequent steps, because child process should
124 # skip subsequent steps, because child process should
125 # be still running in this case
125 # be still running in this case
126 continue
126 continue
127 pids.discard(p)
127 pids.discard(p)
128 st = _exitstatus(st)
128 st = _exitstatus(st)
129 if st and not problem[0]:
129 if st and not problem[0]:
130 problem[0] = st
130 problem[0] = st
131 def sigchldhandler(signum, frame):
131 def sigchldhandler(signum, frame):
132 waitforworkers(blocking=False)
132 waitforworkers(blocking=False)
133 if problem[0]:
133 if problem[0]:
134 killworkers()
134 killworkers()
135 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
135 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
136 ui.flush()
136 for pargs in partition(args, workers):
137 for pargs in partition(args, workers):
137 pid = os.fork()
138 pid = os.fork()
138 if pid == 0:
139 if pid == 0:
139 signal.signal(signal.SIGINT, oldhandler)
140 signal.signal(signal.SIGINT, oldhandler)
140 signal.signal(signal.SIGCHLD, oldchldhandler)
141 signal.signal(signal.SIGCHLD, oldchldhandler)
141
142
142 def workerfunc():
143 def workerfunc():
143 os.close(rfd)
144 os.close(rfd)
144 for i, item in func(*(staticargs + (pargs,))):
145 for i, item in func(*(staticargs + (pargs,))):
145 os.write(wfd, '%d %s\n' % (i, item))
146 os.write(wfd, '%d %s\n' % (i, item))
146
147
147 # make sure we use os._exit in all code paths. otherwise the worker
148 # make sure we use os._exit in all code paths. otherwise the worker
148 # may do some clean-ups which could cause surprises like deadlock.
149 # may do some clean-ups which could cause surprises like deadlock.
149 # see sshpeer.cleanup for example.
150 # see sshpeer.cleanup for example.
150 try:
151 try:
151 try:
152 try:
152 scmutil.callcatch(ui, workerfunc)
153 scmutil.callcatch(ui, workerfunc)
153 finally:
154 finally:
154 ui.flush()
155 ui.flush()
155 except KeyboardInterrupt:
156 except KeyboardInterrupt:
156 os._exit(255)
157 os._exit(255)
157 except: # never return, therefore no re-raises
158 except: # never return, therefore no re-raises
158 try:
159 try:
159 ui.traceback()
160 ui.traceback()
160 ui.flush()
161 ui.flush()
161 finally:
162 finally:
162 os._exit(255)
163 os._exit(255)
163 else:
164 else:
164 os._exit(0)
165 os._exit(0)
165 pids.add(pid)
166 pids.add(pid)
166 os.close(wfd)
167 os.close(wfd)
167 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
168 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
168 def cleanup():
169 def cleanup():
169 signal.signal(signal.SIGINT, oldhandler)
170 signal.signal(signal.SIGINT, oldhandler)
170 waitforworkers()
171 waitforworkers()
171 signal.signal(signal.SIGCHLD, oldchldhandler)
172 signal.signal(signal.SIGCHLD, oldchldhandler)
172 status = problem[0]
173 status = problem[0]
173 if status:
174 if status:
174 if status < 0:
175 if status < 0:
175 os.kill(os.getpid(), -status)
176 os.kill(os.getpid(), -status)
176 sys.exit(status)
177 sys.exit(status)
177 try:
178 try:
178 for line in util.iterfile(fp):
179 for line in util.iterfile(fp):
179 l = line.split(' ', 1)
180 l = line.split(' ', 1)
180 yield int(l[0]), l[1][:-1]
181 yield int(l[0]), l[1][:-1]
181 except: # re-raises
182 except: # re-raises
182 killworkers()
183 killworkers()
183 cleanup()
184 cleanup()
184 raise
185 raise
185 cleanup()
186 cleanup()
186
187
187 def _posixexitstatus(code):
188 def _posixexitstatus(code):
188 '''convert a posix exit status into the same form returned by
189 '''convert a posix exit status into the same form returned by
189 os.spawnv
190 os.spawnv
190
191
191 returns None if the process was stopped instead of exiting'''
192 returns None if the process was stopped instead of exiting'''
192 if os.WIFEXITED(code):
193 if os.WIFEXITED(code):
193 return os.WEXITSTATUS(code)
194 return os.WEXITSTATUS(code)
194 elif os.WIFSIGNALED(code):
195 elif os.WIFSIGNALED(code):
195 return -os.WTERMSIG(code)
196 return -os.WTERMSIG(code)
196
197
197 if pycompat.osname != 'nt':
198 if pycompat.osname != 'nt':
198 _platformworker = _posixworker
199 _platformworker = _posixworker
199 _exitstatus = _posixexitstatus
200 _exitstatus = _posixexitstatus
200
201
201 def partition(lst, nslices):
202 def partition(lst, nslices):
202 '''partition a list into N slices of roughly equal size
203 '''partition a list into N slices of roughly equal size
203
204
204 The current strategy takes every Nth element from the input. If
205 The current strategy takes every Nth element from the input. If
205 we ever write workers that need to preserve grouping in input
206 we ever write workers that need to preserve grouping in input
206 we should consider allowing callers to specify a partition strategy.
207 we should consider allowing callers to specify a partition strategy.
207
208
208 mpm is not a fan of this partitioning strategy when files are involved.
209 mpm is not a fan of this partitioning strategy when files are involved.
209 In his words:
210 In his words:
210
211
211 Single-threaded Mercurial makes a point of creating and visiting
212 Single-threaded Mercurial makes a point of creating and visiting
212 files in a fixed order (alphabetical). When creating files in order,
213 files in a fixed order (alphabetical). When creating files in order,
213 a typical filesystem is likely to allocate them on nearby regions on
214 a typical filesystem is likely to allocate them on nearby regions on
214 disk. Thus, when revisiting in the same order, locality is maximized
215 disk. Thus, when revisiting in the same order, locality is maximized
215 and various forms of OS and disk-level caching and read-ahead get a
216 and various forms of OS and disk-level caching and read-ahead get a
216 chance to work.
217 chance to work.
217
218
218 This effect can be quite significant on spinning disks. I discovered it
219 This effect can be quite significant on spinning disks. I discovered it
219 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
220 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
220 Tarring a repo and copying it to another disk effectively randomized
221 Tarring a repo and copying it to another disk effectively randomized
221 the revlog ordering on disk by sorting the revlogs by hash and suddenly
222 the revlog ordering on disk by sorting the revlogs by hash and suddenly
222 performance of my kernel checkout benchmark dropped by ~10x because the
223 performance of my kernel checkout benchmark dropped by ~10x because the
223 "working set" of sectors visited no longer fit in the drive's cache and
224 "working set" of sectors visited no longer fit in the drive's cache and
224 the workload switched from streaming to random I/O.
225 the workload switched from streaming to random I/O.
225
226
226 What we should really be doing is have workers read filenames from a
227 What we should really be doing is have workers read filenames from a
227 ordered queue. This preserves locality and also keeps any worker from
228 ordered queue. This preserves locality and also keeps any worker from
228 getting more than one file out of balance.
229 getting more than one file out of balance.
229 '''
230 '''
230 for i in range(nslices):
231 for i in range(nslices):
231 yield lst[i::nslices]
232 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now