##// END OF EJS Templates
worker: use absolute_import
Gregory Szorc -
r25992:2d76f8a2 default
parent child Browse files
Show More
@@ -1,158 +1,165 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 from i18n import _
9 import errno, os, signal, sys, threading
10 import util
8 from __future__ import absolute_import
9
10 import errno
11 import os
12 import signal
13 import sys
14 import threading
15
16 from .i18n import _
17 from . import util
11 18
12 19 def countcpus():
13 20 '''try to count the number of CPUs on the system'''
14 21
15 22 # posix
16 23 try:
17 24 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
18 25 if n > 0:
19 26 return n
20 27 except (AttributeError, ValueError):
21 28 pass
22 29
23 30 # windows
24 31 try:
25 32 n = int(os.environ['NUMBER_OF_PROCESSORS'])
26 33 if n > 0:
27 34 return n
28 35 except (KeyError, ValueError):
29 36 pass
30 37
31 38 return 1
32 39
33 40 def _numworkers(ui):
34 41 s = ui.config('worker', 'numcpus')
35 42 if s:
36 43 try:
37 44 n = int(s)
38 45 if n >= 1:
39 46 return n
40 47 except ValueError:
41 48 raise util.Abort(_('number of cpus must be an integer'))
42 49 return min(max(countcpus(), 4), 32)
43 50
44 51 if os.name == 'posix':
45 52 _startupcost = 0.01
46 53 else:
47 54 _startupcost = 1e30
48 55
49 56 def worthwhile(ui, costperop, nops):
50 57 '''try to determine whether the benefit of multiple processes can
51 58 outweigh the cost of starting them'''
52 59 linear = costperop * nops
53 60 workers = _numworkers(ui)
54 61 benefit = linear - (_startupcost * workers + linear / workers)
55 62 return benefit >= 0.15
56 63
57 64 def worker(ui, costperarg, func, staticargs, args):
58 65 '''run a function, possibly in parallel in multiple worker
59 66 processes.
60 67
61 68 returns a progress iterator
62 69
63 70 costperarg - cost of a single task
64 71
65 72 func - function to run
66 73
67 74 staticargs - arguments to pass to every invocation of the function
68 75
69 76 args - arguments to split into chunks, to pass to individual
70 77 workers
71 78 '''
72 79 if worthwhile(ui, costperarg, len(args)):
73 80 return _platformworker(ui, func, staticargs, args)
74 81 return func(*staticargs + (args,))
75 82
76 83 def _posixworker(ui, func, staticargs, args):
77 84 rfd, wfd = os.pipe()
78 85 workers = _numworkers(ui)
79 86 oldhandler = signal.getsignal(signal.SIGINT)
80 87 signal.signal(signal.SIGINT, signal.SIG_IGN)
81 88 pids, problem = [], [0]
82 89 for pargs in partition(args, workers):
83 90 pid = os.fork()
84 91 if pid == 0:
85 92 signal.signal(signal.SIGINT, oldhandler)
86 93 try:
87 94 os.close(rfd)
88 95 for i, item in func(*(staticargs + (pargs,))):
89 96 os.write(wfd, '%d %s\n' % (i, item))
90 97 os._exit(0)
91 98 except KeyboardInterrupt:
92 99 os._exit(255)
93 100 # other exceptions are allowed to propagate, we rely
94 101 # on lock.py's pid checks to avoid release callbacks
95 102 pids.append(pid)
96 103 pids.reverse()
97 104 os.close(wfd)
98 105 fp = os.fdopen(rfd, 'rb', 0)
99 106 def killworkers():
100 107 # if one worker bails, there's no good reason to wait for the rest
101 108 for p in pids:
102 109 try:
103 110 os.kill(p, signal.SIGTERM)
104 111 except OSError as err:
105 112 if err.errno != errno.ESRCH:
106 113 raise
107 114 def waitforworkers():
108 115 for _pid in pids:
109 116 st = _exitstatus(os.wait()[1])
110 117 if st and not problem[0]:
111 118 problem[0] = st
112 119 killworkers()
113 120 t = threading.Thread(target=waitforworkers)
114 121 t.start()
115 122 def cleanup():
116 123 signal.signal(signal.SIGINT, oldhandler)
117 124 t.join()
118 125 status = problem[0]
119 126 if status:
120 127 if status < 0:
121 128 os.kill(os.getpid(), -status)
122 129 sys.exit(status)
123 130 try:
124 131 for line in fp:
125 132 l = line.split(' ', 1)
126 133 yield int(l[0]), l[1][:-1]
127 134 except: # re-raises
128 135 killworkers()
129 136 cleanup()
130 137 raise
131 138 cleanup()
132 139
133 140 def _posixexitstatus(code):
134 141 '''convert a posix exit status into the same form returned by
135 142 os.spawnv
136 143
137 144 returns None if the process was stopped instead of exiting'''
138 145 if os.WIFEXITED(code):
139 146 return os.WEXITSTATUS(code)
140 147 elif os.WIFSIGNALED(code):
141 148 return -os.WTERMSIG(code)
142 149
143 150 if os.name != 'nt':
144 151 _platformworker = _posixworker
145 152 _exitstatus = _posixexitstatus
146 153
147 154 def partition(lst, nslices):
148 155 '''partition a list into N slices of equal size'''
149 156 n = len(lst)
150 157 chunk, slop = n / nslices, n % nslices
151 158 end = 0
152 159 for i in xrange(nslices):
153 160 start = end
154 161 end = start + chunk
155 162 if slop:
156 163 end += 1
157 164 slop -= 1
158 165 yield lst[start:end]
General Comments 0
You need to be logged in to leave comments. Login now