##// END OF EJS Templates
worker: manually buffer reads from pickle stream...
Jan Alexander Steffens (heftig) -
r44751:12491abf stable
parent child Browse files
Show More
@@ -65,6 +65,41 b' def _numworkers(ui):'
65 65 return min(max(countcpus(), 4), 32)
66 66
67 67
68 if pycompat.ispy3:
69
70 class _blockingreader(object):
71 def __init__(self, wrapped):
72 self._wrapped = wrapped
73
74 def __getattr__(self, attr):
75 return getattr(self._wrapped, attr)
76
77 # issue multiple reads until size is fulfilled
78 def read(self, size=-1):
79 if size < 0:
80 return self._wrapped.readall()
81
82 buf = bytearray(size)
83 view = memoryview(buf)
84 pos = 0
85
86 while pos < size:
87 ret = self._wrapped.readinto(view[pos:])
88 if not ret:
89 break
90 pos += ret
91
92 del view
93 del buf[pos:]
94 return buf
95
96
97 else:
98
99 def _blockingreader(wrapped):
100 return wrapped
101
102
68 103 if pycompat.isposix or pycompat.iswindows:
69 104 _STARTUP_COST = 0.01
70 105 # The Windows worker is thread based. If tasks are CPU bound, threads
@@ -226,7 +261,7 b' def _posixworker(ui, func, staticargs, a'
226 261 selector = selectors.DefaultSelector()
227 262 for rfd, wfd in pipes:
228 263 os.close(wfd)
229 selector.register(os.fdopen(rfd, 'rb'), selectors.EVENT_READ)
264 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
230 265
231 266 def cleanup():
232 267 signal.signal(signal.SIGINT, oldhandler)
@@ -240,7 +275,7 b' def _posixworker(ui, func, staticargs, a'
240 275 while openpipes > 0:
241 276 for key, events in selector.select():
242 277 try:
243 res = util.pickle.load(key.fileobj)
278 res = util.pickle.load(_blockingreader(key.fileobj))
244 279 if hasretval and res[0]:
245 280 retval.update(res[1])
246 281 else:
@@ -131,4 +131,35 b' Workers should not do cleanups in all ca'
131 131 abort: known exception
132 132 [255]
133 133
134 Do not crash on partially read result
135
136 $ cat > $TESTTMP/detecttruncated.py <<EOF
137 > from __future__ import absolute_import
138 > import os
139 > import sys
140 > import time
141 > sys.unraisablehook = lambda x: None
142 > oldwrite = os.write
143 > def splitwrite(fd, string):
144 > ret = oldwrite(fd, string[:9])
145 > if ret == 9:
146 > time.sleep(0.1)
147 > ret += oldwrite(fd, string[9:])
148 > return ret
149 > os.write = splitwrite
150 > EOF
151
152 $ hg --config "extensions.t=$abspath" --config worker.numcpus=8 --config \
153 > "extensions.d=$TESTTMP/detecttruncated.py" test 100000.0
154 start
155 run
156 run
157 run
158 run
159 run
160 run
161 run
162 run
163 done
164
134 165 #endif
General Comments 0
You need to be logged in to leave comments. Login now