##// END OF EJS Templates
worker: migrate to util.iterfile
Jun Wu -
r30396:78a58dcf default
parent child Browse files
Show More
@@ -1,184 +1,187 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15
15
16 from .i18n import _
16 from .i18n import _
17 from . import error
17 from . import (
18 error,
19 util,
20 )
18
21
19 def countcpus():
22 def countcpus():
20 '''try to count the number of CPUs on the system'''
23 '''try to count the number of CPUs on the system'''
21
24
22 # posix
25 # posix
23 try:
26 try:
24 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
27 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
25 if n > 0:
28 if n > 0:
26 return n
29 return n
27 except (AttributeError, ValueError):
30 except (AttributeError, ValueError):
28 pass
31 pass
29
32
30 # windows
33 # windows
31 try:
34 try:
32 n = int(os.environ['NUMBER_OF_PROCESSORS'])
35 n = int(os.environ['NUMBER_OF_PROCESSORS'])
33 if n > 0:
36 if n > 0:
34 return n
37 return n
35 except (KeyError, ValueError):
38 except (KeyError, ValueError):
36 pass
39 pass
37
40
38 return 1
41 return 1
39
42
40 def _numworkers(ui):
43 def _numworkers(ui):
41 s = ui.config('worker', 'numcpus')
44 s = ui.config('worker', 'numcpus')
42 if s:
45 if s:
43 try:
46 try:
44 n = int(s)
47 n = int(s)
45 if n >= 1:
48 if n >= 1:
46 return n
49 return n
47 except ValueError:
50 except ValueError:
48 raise error.Abort(_('number of cpus must be an integer'))
51 raise error.Abort(_('number of cpus must be an integer'))
49 return min(max(countcpus(), 4), 32)
52 return min(max(countcpus(), 4), 32)
50
53
51 if os.name == 'posix':
54 if os.name == 'posix':
52 _startupcost = 0.01
55 _startupcost = 0.01
53 else:
56 else:
54 _startupcost = 1e30
57 _startupcost = 1e30
55
58
56 def worthwhile(ui, costperop, nops):
59 def worthwhile(ui, costperop, nops):
57 '''try to determine whether the benefit of multiple processes can
60 '''try to determine whether the benefit of multiple processes can
58 outweigh the cost of starting them'''
61 outweigh the cost of starting them'''
59 linear = costperop * nops
62 linear = costperop * nops
60 workers = _numworkers(ui)
63 workers = _numworkers(ui)
61 benefit = linear - (_startupcost * workers + linear / workers)
64 benefit = linear - (_startupcost * workers + linear / workers)
62 return benefit >= 0.15
65 return benefit >= 0.15
63
66
64 def worker(ui, costperarg, func, staticargs, args):
67 def worker(ui, costperarg, func, staticargs, args):
65 '''run a function, possibly in parallel in multiple worker
68 '''run a function, possibly in parallel in multiple worker
66 processes.
69 processes.
67
70
68 returns a progress iterator
71 returns a progress iterator
69
72
70 costperarg - cost of a single task
73 costperarg - cost of a single task
71
74
72 func - function to run
75 func - function to run
73
76
74 staticargs - arguments to pass to every invocation of the function
77 staticargs - arguments to pass to every invocation of the function
75
78
76 args - arguments to split into chunks, to pass to individual
79 args - arguments to split into chunks, to pass to individual
77 workers
80 workers
78 '''
81 '''
79 if worthwhile(ui, costperarg, len(args)):
82 if worthwhile(ui, costperarg, len(args)):
80 return _platformworker(ui, func, staticargs, args)
83 return _platformworker(ui, func, staticargs, args)
81 return func(*staticargs + (args,))
84 return func(*staticargs + (args,))
82
85
83 def _posixworker(ui, func, staticargs, args):
86 def _posixworker(ui, func, staticargs, args):
84 rfd, wfd = os.pipe()
87 rfd, wfd = os.pipe()
85 workers = _numworkers(ui)
88 workers = _numworkers(ui)
86 oldhandler = signal.getsignal(signal.SIGINT)
89 oldhandler = signal.getsignal(signal.SIGINT)
87 signal.signal(signal.SIGINT, signal.SIG_IGN)
90 signal.signal(signal.SIGINT, signal.SIG_IGN)
88 pids, problem = [], [0]
91 pids, problem = [], [0]
89 for pargs in partition(args, workers):
92 for pargs in partition(args, workers):
90 pid = os.fork()
93 pid = os.fork()
91 if pid == 0:
94 if pid == 0:
92 signal.signal(signal.SIGINT, oldhandler)
95 signal.signal(signal.SIGINT, oldhandler)
93 try:
96 try:
94 os.close(rfd)
97 os.close(rfd)
95 for i, item in func(*(staticargs + (pargs,))):
98 for i, item in func(*(staticargs + (pargs,))):
96 os.write(wfd, '%d %s\n' % (i, item))
99 os.write(wfd, '%d %s\n' % (i, item))
97 os._exit(0)
100 os._exit(0)
98 except KeyboardInterrupt:
101 except KeyboardInterrupt:
99 os._exit(255)
102 os._exit(255)
100 # other exceptions are allowed to propagate, we rely
103 # other exceptions are allowed to propagate, we rely
101 # on lock.py's pid checks to avoid release callbacks
104 # on lock.py's pid checks to avoid release callbacks
102 pids.append(pid)
105 pids.append(pid)
103 pids.reverse()
106 pids.reverse()
104 os.close(wfd)
107 os.close(wfd)
105 fp = os.fdopen(rfd, 'rb', 0)
108 fp = os.fdopen(rfd, 'rb', 0)
106 def killworkers():
109 def killworkers():
107 # if one worker bails, there's no good reason to wait for the rest
110 # if one worker bails, there's no good reason to wait for the rest
108 for p in pids:
111 for p in pids:
109 try:
112 try:
110 os.kill(p, signal.SIGTERM)
113 os.kill(p, signal.SIGTERM)
111 except OSError as err:
114 except OSError as err:
112 if err.errno != errno.ESRCH:
115 if err.errno != errno.ESRCH:
113 raise
116 raise
114 def waitforworkers():
117 def waitforworkers():
115 for _pid in pids:
118 for _pid in pids:
116 st = _exitstatus(os.wait()[1])
119 st = _exitstatus(os.wait()[1])
117 if st and not problem[0]:
120 if st and not problem[0]:
118 problem[0] = st
121 problem[0] = st
119 killworkers()
122 killworkers()
120 t = threading.Thread(target=waitforworkers)
123 t = threading.Thread(target=waitforworkers)
121 t.start()
124 t.start()
122 def cleanup():
125 def cleanup():
123 signal.signal(signal.SIGINT, oldhandler)
126 signal.signal(signal.SIGINT, oldhandler)
124 t.join()
127 t.join()
125 status = problem[0]
128 status = problem[0]
126 if status:
129 if status:
127 if status < 0:
130 if status < 0:
128 os.kill(os.getpid(), -status)
131 os.kill(os.getpid(), -status)
129 sys.exit(status)
132 sys.exit(status)
130 try:
133 try:
131 for line in fp:
134 for line in util.iterfile(fp):
132 l = line.split(' ', 1)
135 l = line.split(' ', 1)
133 yield int(l[0]), l[1][:-1]
136 yield int(l[0]), l[1][:-1]
134 except: # re-raises
137 except: # re-raises
135 killworkers()
138 killworkers()
136 cleanup()
139 cleanup()
137 raise
140 raise
138 cleanup()
141 cleanup()
139
142
140 def _posixexitstatus(code):
143 def _posixexitstatus(code):
141 '''convert a posix exit status into the same form returned by
144 '''convert a posix exit status into the same form returned by
142 os.spawnv
145 os.spawnv
143
146
144 returns None if the process was stopped instead of exiting'''
147 returns None if the process was stopped instead of exiting'''
145 if os.WIFEXITED(code):
148 if os.WIFEXITED(code):
146 return os.WEXITSTATUS(code)
149 return os.WEXITSTATUS(code)
147 elif os.WIFSIGNALED(code):
150 elif os.WIFSIGNALED(code):
148 return -os.WTERMSIG(code)
151 return -os.WTERMSIG(code)
149
152
150 if os.name != 'nt':
153 if os.name != 'nt':
151 _platformworker = _posixworker
154 _platformworker = _posixworker
152 _exitstatus = _posixexitstatus
155 _exitstatus = _posixexitstatus
153
156
154 def partition(lst, nslices):
157 def partition(lst, nslices):
155 '''partition a list into N slices of roughly equal size
158 '''partition a list into N slices of roughly equal size
156
159
157 The current strategy takes every Nth element from the input. If
160 The current strategy takes every Nth element from the input. If
158 we ever write workers that need to preserve grouping in input
161 we ever write workers that need to preserve grouping in input
159 we should consider allowing callers to specify a partition strategy.
162 we should consider allowing callers to specify a partition strategy.
160
163
161 mpm is not a fan of this partitioning strategy when files are involved.
164 mpm is not a fan of this partitioning strategy when files are involved.
162 In his words:
165 In his words:
163
166
164 Single-threaded Mercurial makes a point of creating and visiting
167 Single-threaded Mercurial makes a point of creating and visiting
165 files in a fixed order (alphabetical). When creating files in order,
168 files in a fixed order (alphabetical). When creating files in order,
166 a typical filesystem is likely to allocate them on nearby regions on
169 a typical filesystem is likely to allocate them on nearby regions on
167 disk. Thus, when revisiting in the same order, locality is maximized
170 disk. Thus, when revisiting in the same order, locality is maximized
168 and various forms of OS and disk-level caching and read-ahead get a
171 and various forms of OS and disk-level caching and read-ahead get a
169 chance to work.
172 chance to work.
170
173
171 This effect can be quite significant on spinning disks. I discovered it
174 This effect can be quite significant on spinning disks. I discovered it
172 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
175 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
173 Tarring a repo and copying it to another disk effectively randomized
176 Tarring a repo and copying it to another disk effectively randomized
174 the revlog ordering on disk by sorting the revlogs by hash and suddenly
177 the revlog ordering on disk by sorting the revlogs by hash and suddenly
175 performance of my kernel checkout benchmark dropped by ~10x because the
178 performance of my kernel checkout benchmark dropped by ~10x because the
176 "working set" of sectors visited no longer fit in the drive's cache and
179 "working set" of sectors visited no longer fit in the drive's cache and
177 the workload switched from streaming to random I/O.
180 the workload switched from streaming to random I/O.
178
181
179 What we should really be doing is have workers read filenames from a
182 What we should really be doing is have workers read filenames from a
180 ordered queue. This preserves locality and also keeps any worker from
183 ordered queue. This preserves locality and also keeps any worker from
181 getting more than one file out of balance.
184 getting more than one file out of balance.
182 '''
185 '''
183 for i in range(nslices):
186 for i in range(nslices):
184 yield lst[i::nslices]
187 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now