##// END OF EJS Templates
worker: use one pipe per posix worker and select() in parent process...
Danny Hooper -
r38752:9e6afe7f default
parent child Browse files
Show More
@@ -14,6 +14,12 b' import sys'
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
18 import selectors
19 selectors.BaseSelector
20 except ImportError:
21 from .thirdparty import selectors2 as selectors
22
17 from .i18n import _
23 from .i18n import _
18 from . import (
24 from . import (
19 encoding,
25 encoding,
@@ -89,7 +95,6 b' def worker(ui, costperarg, func, statica'
89 return func(*staticargs + (args,))
95 return func(*staticargs + (args,))
90
96
91 def _posixworker(ui, func, staticargs, args):
97 def _posixworker(ui, func, staticargs, args):
92 rfd, wfd = os.pipe()
93 workers = _numworkers(ui)
98 workers = _numworkers(ui)
94 oldhandler = signal.getsignal(signal.SIGINT)
99 oldhandler = signal.getsignal(signal.SIGINT)
95 signal.signal(signal.SIGINT, signal.SIG_IGN)
100 signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -138,7 +143,15 b' def _posixworker(ui, func, staticargs, a'
138 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
143 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
139 ui.flush()
144 ui.flush()
140 parentpid = os.getpid()
145 parentpid = os.getpid()
146 pipes = []
141 for pargs in partition(args, workers):
147 for pargs in partition(args, workers):
148 # Every worker gets its own pipe to send results on, so we don't have to
149 # implement atomic writes larger than PIPE_BUF. Each forked process has
150 # its own pipe's descriptors in the local variables, and the parent
151 # process has the full list of pipe descriptors (and it doesn't really
152 # care what order they're in).
153 rfd, wfd = os.pipe()
154 pipes.append((rfd, wfd))
142 # make sure we use os._exit in all worker code paths. otherwise the
155 # make sure we use os._exit in all worker code paths. otherwise the
143 # worker may do some clean-ups which could cause surprises like
156 # worker may do some clean-ups which could cause surprises like
144 # deadlock. see sshpeer.cleanup for example.
157 # deadlock. see sshpeer.cleanup for example.
@@ -154,6 +167,9 b' def _posixworker(ui, func, staticargs, a'
154 signal.signal(signal.SIGCHLD, oldchldhandler)
167 signal.signal(signal.SIGCHLD, oldchldhandler)
155
168
156 def workerfunc():
169 def workerfunc():
170 for r, w in pipes[:-1]:
171 os.close(r)
172 os.close(w)
157 os.close(rfd)
173 os.close(rfd)
158 for result in func(*(staticargs + (pargs,))):
174 for result in func(*(staticargs + (pargs,))):
159 os.write(wfd, util.pickle.dumps(result))
175 os.write(wfd, util.pickle.dumps(result))
@@ -175,8 +191,10 b' def _posixworker(ui, func, staticargs, a'
175 finally:
191 finally:
176 os._exit(ret & 255)
192 os._exit(ret & 255)
177 pids.add(pid)
193 pids.add(pid)
194 selector = selectors.DefaultSelector()
195 for rfd, wfd in pipes:
178 os.close(wfd)
196 os.close(wfd)
179 fp = os.fdopen(rfd, r'rb', 0)
197 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
180 def cleanup():
198 def cleanup():
181 signal.signal(signal.SIGINT, oldhandler)
199 signal.signal(signal.SIGINT, oldhandler)
182 waitforworkers()
200 waitforworkers()
@@ -187,11 +205,15 b' def _posixworker(ui, func, staticargs, a'
187 os.kill(os.getpid(), -status)
205 os.kill(os.getpid(), -status)
188 sys.exit(status)
206 sys.exit(status)
189 try:
207 try:
190 while True:
208 openpipes = len(pipes)
209 while openpipes > 0:
210 for key, events in selector.select():
191 try:
211 try:
192 yield util.pickle.load(fp)
212 yield util.pickle.load(key.fileobj)
193 except EOFError:
213 except EOFError:
194 break
214 selector.unregister(key.fileobj)
215 key.fileobj.close()
216 openpipes -= 1
195 except IOError as e:
217 except IOError as e:
196 if e.errno == errno.EINTR:
218 if e.errno == errno.EINTR:
197 continue
219 continue
General Comments 0
You need to be logged in to leave comments. Login now