##// END OF EJS Templates
worker: use one pipe per posix worker and select() in parent process...
Danny Hooper -
r38752:9e6afe7f default
parent child Browse files
Show More
@@ -14,6 +14,12 b' import sys'
14 14 import threading
15 15 import time
16 16
17 try:
18 import selectors
19 selectors.BaseSelector
20 except ImportError:
21 from .thirdparty import selectors2 as selectors
22
17 23 from .i18n import _
18 24 from . import (
19 25 encoding,
@@ -89,7 +95,6 b' def worker(ui, costperarg, func, statica'
89 95 return func(*staticargs + (args,))
90 96
91 97 def _posixworker(ui, func, staticargs, args):
92 rfd, wfd = os.pipe()
93 98 workers = _numworkers(ui)
94 99 oldhandler = signal.getsignal(signal.SIGINT)
95 100 signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -138,7 +143,15 b' def _posixworker(ui, func, staticargs, a'
138 143 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
139 144 ui.flush()
140 145 parentpid = os.getpid()
146 pipes = []
141 147 for pargs in partition(args, workers):
148 # Every worker gets its own pipe to send results on, so we don't have to
149 # implement atomic writes larger than PIPE_BUF. Each forked process has
150 # its own pipe's descriptors in the local variables, and the parent
151 # process has the full list of pipe descriptors (and it doesn't really
152 # care what order they're in).
153 rfd, wfd = os.pipe()
154 pipes.append((rfd, wfd))
142 155 # make sure we use os._exit in all worker code paths. otherwise the
143 156 # worker may do some clean-ups which could cause surprises like
144 157 # deadlock. see sshpeer.cleanup for example.
@@ -154,6 +167,9 b' def _posixworker(ui, func, staticargs, a'
154 167 signal.signal(signal.SIGCHLD, oldchldhandler)
155 168
156 169 def workerfunc():
170 for r, w in pipes[:-1]:
171 os.close(r)
172 os.close(w)
157 173 os.close(rfd)
158 174 for result in func(*(staticargs + (pargs,))):
159 175 os.write(wfd, util.pickle.dumps(result))
@@ -175,8 +191,10 b' def _posixworker(ui, func, staticargs, a'
175 191 finally:
176 192 os._exit(ret & 255)
177 193 pids.add(pid)
178 os.close(wfd)
179 fp = os.fdopen(rfd, r'rb', 0)
194 selector = selectors.DefaultSelector()
195 for rfd, wfd in pipes:
196 os.close(wfd)
197 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
180 198 def cleanup():
181 199 signal.signal(signal.SIGINT, oldhandler)
182 200 waitforworkers()
@@ -187,15 +205,19 b' def _posixworker(ui, func, staticargs, a'
187 205 os.kill(os.getpid(), -status)
188 206 sys.exit(status)
189 207 try:
190 while True:
191 try:
192 yield util.pickle.load(fp)
193 except EOFError:
194 break
195 except IOError as e:
196 if e.errno == errno.EINTR:
197 continue
198 raise
208 openpipes = len(pipes)
209 while openpipes > 0:
210 for key, events in selector.select():
211 try:
212 yield util.pickle.load(key.fileobj)
213 except EOFError:
214 selector.unregister(key.fileobj)
215 key.fileobj.close()
216 openpipes -= 1
217 except IOError as e:
218 if e.errno == errno.EINTR:
219 continue
220 raise
199 221 except: # re-raises
200 222 killworkers()
201 223 cleanup()
General Comments 0
You need to be logged in to leave comments. Login now