##// END OF EJS Templates
worker: use absolute_import
Gregory Szorc -
r25992:2d76f8a2 default
parent child Browse files
Show More
@@ -1,158 +1,165 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from i18n import _
8 from __future__ import absolute_import
9 import errno, os, signal, sys, threading
9
10 import util
10 import errno
11 import os
12 import signal
13 import sys
14 import threading
15
16 from .i18n import _
17 from . import util
11
18
12 def countcpus():
19 def countcpus():
13 '''try to count the number of CPUs on the system'''
20 '''try to count the number of CPUs on the system'''
14
21
15 # posix
22 # posix
16 try:
23 try:
17 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
24 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
18 if n > 0:
25 if n > 0:
19 return n
26 return n
20 except (AttributeError, ValueError):
27 except (AttributeError, ValueError):
21 pass
28 pass
22
29
23 # windows
30 # windows
24 try:
31 try:
25 n = int(os.environ['NUMBER_OF_PROCESSORS'])
32 n = int(os.environ['NUMBER_OF_PROCESSORS'])
26 if n > 0:
33 if n > 0:
27 return n
34 return n
28 except (KeyError, ValueError):
35 except (KeyError, ValueError):
29 pass
36 pass
30
37
31 return 1
38 return 1
32
39
33 def _numworkers(ui):
40 def _numworkers(ui):
34 s = ui.config('worker', 'numcpus')
41 s = ui.config('worker', 'numcpus')
35 if s:
42 if s:
36 try:
43 try:
37 n = int(s)
44 n = int(s)
38 if n >= 1:
45 if n >= 1:
39 return n
46 return n
40 except ValueError:
47 except ValueError:
41 raise util.Abort(_('number of cpus must be an integer'))
48 raise util.Abort(_('number of cpus must be an integer'))
42 return min(max(countcpus(), 4), 32)
49 return min(max(countcpus(), 4), 32)
43
50
44 if os.name == 'posix':
51 if os.name == 'posix':
45 _startupcost = 0.01
52 _startupcost = 0.01
46 else:
53 else:
47 _startupcost = 1e30
54 _startupcost = 1e30
48
55
49 def worthwhile(ui, costperop, nops):
56 def worthwhile(ui, costperop, nops):
50 '''try to determine whether the benefit of multiple processes can
57 '''try to determine whether the benefit of multiple processes can
51 outweigh the cost of starting them'''
58 outweigh the cost of starting them'''
52 linear = costperop * nops
59 linear = costperop * nops
53 workers = _numworkers(ui)
60 workers = _numworkers(ui)
54 benefit = linear - (_startupcost * workers + linear / workers)
61 benefit = linear - (_startupcost * workers + linear / workers)
55 return benefit >= 0.15
62 return benefit >= 0.15
56
63
57 def worker(ui, costperarg, func, staticargs, args):
64 def worker(ui, costperarg, func, staticargs, args):
58 '''run a function, possibly in parallel in multiple worker
65 '''run a function, possibly in parallel in multiple worker
59 processes.
66 processes.
60
67
61 returns a progress iterator
68 returns a progress iterator
62
69
63 costperarg - cost of a single task
70 costperarg - cost of a single task
64
71
65 func - function to run
72 func - function to run
66
73
67 staticargs - arguments to pass to every invocation of the function
74 staticargs - arguments to pass to every invocation of the function
68
75
69 args - arguments to split into chunks, to pass to individual
76 args - arguments to split into chunks, to pass to individual
70 workers
77 workers
71 '''
78 '''
72 if worthwhile(ui, costperarg, len(args)):
79 if worthwhile(ui, costperarg, len(args)):
73 return _platformworker(ui, func, staticargs, args)
80 return _platformworker(ui, func, staticargs, args)
74 return func(*staticargs + (args,))
81 return func(*staticargs + (args,))
75
82
76 def _posixworker(ui, func, staticargs, args):
83 def _posixworker(ui, func, staticargs, args):
77 rfd, wfd = os.pipe()
84 rfd, wfd = os.pipe()
78 workers = _numworkers(ui)
85 workers = _numworkers(ui)
79 oldhandler = signal.getsignal(signal.SIGINT)
86 oldhandler = signal.getsignal(signal.SIGINT)
80 signal.signal(signal.SIGINT, signal.SIG_IGN)
87 signal.signal(signal.SIGINT, signal.SIG_IGN)
81 pids, problem = [], [0]
88 pids, problem = [], [0]
82 for pargs in partition(args, workers):
89 for pargs in partition(args, workers):
83 pid = os.fork()
90 pid = os.fork()
84 if pid == 0:
91 if pid == 0:
85 signal.signal(signal.SIGINT, oldhandler)
92 signal.signal(signal.SIGINT, oldhandler)
86 try:
93 try:
87 os.close(rfd)
94 os.close(rfd)
88 for i, item in func(*(staticargs + (pargs,))):
95 for i, item in func(*(staticargs + (pargs,))):
89 os.write(wfd, '%d %s\n' % (i, item))
96 os.write(wfd, '%d %s\n' % (i, item))
90 os._exit(0)
97 os._exit(0)
91 except KeyboardInterrupt:
98 except KeyboardInterrupt:
92 os._exit(255)
99 os._exit(255)
93 # other exceptions are allowed to propagate, we rely
100 # other exceptions are allowed to propagate, we rely
94 # on lock.py's pid checks to avoid release callbacks
101 # on lock.py's pid checks to avoid release callbacks
95 pids.append(pid)
102 pids.append(pid)
96 pids.reverse()
103 pids.reverse()
97 os.close(wfd)
104 os.close(wfd)
98 fp = os.fdopen(rfd, 'rb', 0)
105 fp = os.fdopen(rfd, 'rb', 0)
99 def killworkers():
106 def killworkers():
100 # if one worker bails, there's no good reason to wait for the rest
107 # if one worker bails, there's no good reason to wait for the rest
101 for p in pids:
108 for p in pids:
102 try:
109 try:
103 os.kill(p, signal.SIGTERM)
110 os.kill(p, signal.SIGTERM)
104 except OSError as err:
111 except OSError as err:
105 if err.errno != errno.ESRCH:
112 if err.errno != errno.ESRCH:
106 raise
113 raise
107 def waitforworkers():
114 def waitforworkers():
108 for _pid in pids:
115 for _pid in pids:
109 st = _exitstatus(os.wait()[1])
116 st = _exitstatus(os.wait()[1])
110 if st and not problem[0]:
117 if st and not problem[0]:
111 problem[0] = st
118 problem[0] = st
112 killworkers()
119 killworkers()
113 t = threading.Thread(target=waitforworkers)
120 t = threading.Thread(target=waitforworkers)
114 t.start()
121 t.start()
115 def cleanup():
122 def cleanup():
116 signal.signal(signal.SIGINT, oldhandler)
123 signal.signal(signal.SIGINT, oldhandler)
117 t.join()
124 t.join()
118 status = problem[0]
125 status = problem[0]
119 if status:
126 if status:
120 if status < 0:
127 if status < 0:
121 os.kill(os.getpid(), -status)
128 os.kill(os.getpid(), -status)
122 sys.exit(status)
129 sys.exit(status)
123 try:
130 try:
124 for line in fp:
131 for line in fp:
125 l = line.split(' ', 1)
132 l = line.split(' ', 1)
126 yield int(l[0]), l[1][:-1]
133 yield int(l[0]), l[1][:-1]
127 except: # re-raises
134 except: # re-raises
128 killworkers()
135 killworkers()
129 cleanup()
136 cleanup()
130 raise
137 raise
131 cleanup()
138 cleanup()
132
139
133 def _posixexitstatus(code):
140 def _posixexitstatus(code):
134 '''convert a posix exit status into the same form returned by
141 '''convert a posix exit status into the same form returned by
135 os.spawnv
142 os.spawnv
136
143
137 returns None if the process was stopped instead of exiting'''
144 returns None if the process was stopped instead of exiting'''
138 if os.WIFEXITED(code):
145 if os.WIFEXITED(code):
139 return os.WEXITSTATUS(code)
146 return os.WEXITSTATUS(code)
140 elif os.WIFSIGNALED(code):
147 elif os.WIFSIGNALED(code):
141 return -os.WTERMSIG(code)
148 return -os.WTERMSIG(code)
142
149
143 if os.name != 'nt':
150 if os.name != 'nt':
144 _platformworker = _posixworker
151 _platformworker = _posixworker
145 _exitstatus = _posixexitstatus
152 _exitstatus = _posixexitstatus
146
153
147 def partition(lst, nslices):
154 def partition(lst, nslices):
148 '''partition a list into N slices of equal size'''
155 '''partition a list into N slices of equal size'''
149 n = len(lst)
156 n = len(lst)
150 chunk, slop = n / nslices, n % nslices
157 chunk, slop = n / nslices, n % nslices
151 end = 0
158 end = 0
152 for i in xrange(nslices):
159 for i in xrange(nslices):
153 start = end
160 start = end
154 end = start + chunk
161 end = start + chunk
155 if slop:
162 if slop:
156 end += 1
163 end += 1
157 slop -= 1
164 slop -= 1
158 yield lst[start:end]
165 yield lst[start:end]
General Comments 0
You need to be logged in to leave comments. Login now