##// END OF EJS Templates
keepalive: rewrite readinto() to not use read()...
Augie Fackler -
r37605:192b7ad0 default
parent child Browse files
Show More
@@ -1,738 +1,751 b''
1 # This library is free software; you can redistribute it and/or
1 # This library is free software; you can redistribute it and/or
2 # modify it under the terms of the GNU Lesser General Public
2 # modify it under the terms of the GNU Lesser General Public
3 # License as published by the Free Software Foundation; either
3 # License as published by the Free Software Foundation; either
4 # version 2.1 of the License, or (at your option) any later version.
4 # version 2.1 of the License, or (at your option) any later version.
5 #
5 #
6 # This library is distributed in the hope that it will be useful,
6 # This library is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9 # Lesser General Public License for more details.
9 # Lesser General Public License for more details.
10 #
10 #
11 # You should have received a copy of the GNU Lesser General Public
11 # You should have received a copy of the GNU Lesser General Public
12 # License along with this library; if not, see
12 # License along with this library; if not, see
13 # <http://www.gnu.org/licenses/>.
13 # <http://www.gnu.org/licenses/>.
14
14
15 # This file is part of urlgrabber, a high-level cross-protocol url-grabber
15 # This file is part of urlgrabber, a high-level cross-protocol url-grabber
16 # Copyright 2002-2004 Michael D. Stenner, Ryan Tomayko
16 # Copyright 2002-2004 Michael D. Stenner, Ryan Tomayko
17
17
18 # Modified by Benoit Boissinot:
18 # Modified by Benoit Boissinot:
19 # - fix for digest auth (inspired from urllib2.py @ Python v2.4)
19 # - fix for digest auth (inspired from urllib2.py @ Python v2.4)
20 # Modified by Dirkjan Ochtman:
20 # Modified by Dirkjan Ochtman:
21 # - import md5 function from a local util module
21 # - import md5 function from a local util module
22 # Modified by Augie Fackler:
22 # Modified by Augie Fackler:
23 # - add safesend method and use it to prevent broken pipe errors
23 # - add safesend method and use it to prevent broken pipe errors
24 # on large POST requests
24 # on large POST requests
25
25
26 """An HTTP handler for urllib2 that supports HTTP 1.1 and keepalive.
26 """An HTTP handler for urllib2 that supports HTTP 1.1 and keepalive.
27
27
28 >>> import urllib2
28 >>> import urllib2
29 >>> from keepalive import HTTPHandler
29 >>> from keepalive import HTTPHandler
30 >>> keepalive_handler = HTTPHandler()
30 >>> keepalive_handler = HTTPHandler()
31 >>> opener = urlreq.buildopener(keepalive_handler)
31 >>> opener = urlreq.buildopener(keepalive_handler)
32 >>> urlreq.installopener(opener)
32 >>> urlreq.installopener(opener)
33 >>>
33 >>>
34 >>> fo = urlreq.urlopen('http://www.python.org')
34 >>> fo = urlreq.urlopen('http://www.python.org')
35
35
36 If a connection to a given host is requested, and all of the existing
36 If a connection to a given host is requested, and all of the existing
37 connections are still in use, another connection will be opened. If
37 connections are still in use, another connection will be opened. If
38 the handler tries to use an existing connection but it fails in some
38 the handler tries to use an existing connection but it fails in some
39 way, it will be closed and removed from the pool.
39 way, it will be closed and removed from the pool.
40
40
41 To remove the handler, simply re-run build_opener with no arguments, and
41 To remove the handler, simply re-run build_opener with no arguments, and
42 install that opener.
42 install that opener.
43
43
44 You can explicitly close connections by using the close_connection()
44 You can explicitly close connections by using the close_connection()
45 method of the returned file-like object (described below) or you can
45 method of the returned file-like object (described below) or you can
46 use the handler methods:
46 use the handler methods:
47
47
48 close_connection(host)
48 close_connection(host)
49 close_all()
49 close_all()
50 open_connections()
50 open_connections()
51
51
52 NOTE: using the close_connection and close_all methods of the handler
52 NOTE: using the close_connection and close_all methods of the handler
53 should be done with care when using multiple threads.
53 should be done with care when using multiple threads.
54 * there is nothing that prevents another thread from creating new
54 * there is nothing that prevents another thread from creating new
55 connections immediately after connections are closed
55 connections immediately after connections are closed
56 * no checks are done to prevent in-use connections from being closed
56 * no checks are done to prevent in-use connections from being closed
57
57
58 >>> keepalive_handler.close_all()
58 >>> keepalive_handler.close_all()
59
59
60 EXTRA ATTRIBUTES AND METHODS
60 EXTRA ATTRIBUTES AND METHODS
61
61
62 Upon a status of 200, the object returned has a few additional
62 Upon a status of 200, the object returned has a few additional
63 attributes and methods, which should not be used if you want to
63 attributes and methods, which should not be used if you want to
64 remain consistent with the normal urllib2-returned objects:
64 remain consistent with the normal urllib2-returned objects:
65
65
66 close_connection() - close the connection to the host
66 close_connection() - close the connection to the host
67 readlines() - you know, readlines()
67 readlines() - you know, readlines()
68 status - the return status (i.e. 404)
68 status - the return status (i.e. 404)
69 reason - english translation of status (i.e. 'File not found')
69 reason - english translation of status (i.e. 'File not found')
70
70
71 If you want the best of both worlds, use this inside an
71 If you want the best of both worlds, use this inside an
72 AttributeError-catching try:
72 AttributeError-catching try:
73
73
74 >>> try: status = fo.status
74 >>> try: status = fo.status
75 >>> except AttributeError: status = None
75 >>> except AttributeError: status = None
76
76
77 Unfortunately, these are ONLY there if status == 200, so it's not
77 Unfortunately, these are ONLY there if status == 200, so it's not
78 easy to distinguish between non-200 responses. The reason is that
78 easy to distinguish between non-200 responses. The reason is that
79 urllib2 tries to do clever things with error codes 301, 302, 401,
79 urllib2 tries to do clever things with error codes 301, 302, 401,
80 and 407, and it wraps the object upon return.
80 and 407, and it wraps the object upon return.
81 """
81 """
82
82
83 # $Id: keepalive.py,v 1.14 2006/04/04 21:00:32 mstenner Exp $
83 # $Id: keepalive.py,v 1.14 2006/04/04 21:00:32 mstenner Exp $
84
84
85 from __future__ import absolute_import, print_function
85 from __future__ import absolute_import, print_function
86
86
87 import errno
87 import errno
88 import hashlib
88 import hashlib
89 import socket
89 import socket
90 import sys
90 import sys
91 import threading
91 import threading
92
92
93 from .i18n import _
93 from .i18n import _
94 from . import (
94 from . import (
95 node,
95 node,
96 pycompat,
96 pycompat,
97 urllibcompat,
97 urllibcompat,
98 util,
98 util,
99 )
99 )
100 from .utils import (
100 from .utils import (
101 procutil,
101 procutil,
102 )
102 )
103
103
104 httplib = util.httplib
104 httplib = util.httplib
105 urlerr = util.urlerr
105 urlerr = util.urlerr
106 urlreq = util.urlreq
106 urlreq = util.urlreq
107
107
108 DEBUG = None
108 DEBUG = None
109
109
110 class ConnectionManager(object):
110 class ConnectionManager(object):
111 """
111 """
112 The connection manager must be able to:
112 The connection manager must be able to:
113 * keep track of all existing
113 * keep track of all existing
114 """
114 """
115 def __init__(self):
115 def __init__(self):
116 self._lock = threading.Lock()
116 self._lock = threading.Lock()
117 self._hostmap = {} # map hosts to a list of connections
117 self._hostmap = {} # map hosts to a list of connections
118 self._connmap = {} # map connections to host
118 self._connmap = {} # map connections to host
119 self._readymap = {} # map connection to ready state
119 self._readymap = {} # map connection to ready state
120
120
121 def add(self, host, connection, ready):
121 def add(self, host, connection, ready):
122 self._lock.acquire()
122 self._lock.acquire()
123 try:
123 try:
124 if host not in self._hostmap:
124 if host not in self._hostmap:
125 self._hostmap[host] = []
125 self._hostmap[host] = []
126 self._hostmap[host].append(connection)
126 self._hostmap[host].append(connection)
127 self._connmap[connection] = host
127 self._connmap[connection] = host
128 self._readymap[connection] = ready
128 self._readymap[connection] = ready
129 finally:
129 finally:
130 self._lock.release()
130 self._lock.release()
131
131
132 def remove(self, connection):
132 def remove(self, connection):
133 self._lock.acquire()
133 self._lock.acquire()
134 try:
134 try:
135 try:
135 try:
136 host = self._connmap[connection]
136 host = self._connmap[connection]
137 except KeyError:
137 except KeyError:
138 pass
138 pass
139 else:
139 else:
140 del self._connmap[connection]
140 del self._connmap[connection]
141 del self._readymap[connection]
141 del self._readymap[connection]
142 self._hostmap[host].remove(connection)
142 self._hostmap[host].remove(connection)
143 if not self._hostmap[host]:
143 if not self._hostmap[host]:
144 del self._hostmap[host]
144 del self._hostmap[host]
145 finally:
145 finally:
146 self._lock.release()
146 self._lock.release()
147
147
148 def set_ready(self, connection, ready):
148 def set_ready(self, connection, ready):
149 try:
149 try:
150 self._readymap[connection] = ready
150 self._readymap[connection] = ready
151 except KeyError:
151 except KeyError:
152 pass
152 pass
153
153
154 def get_ready_conn(self, host):
154 def get_ready_conn(self, host):
155 conn = None
155 conn = None
156 self._lock.acquire()
156 self._lock.acquire()
157 try:
157 try:
158 if host in self._hostmap:
158 if host in self._hostmap:
159 for c in self._hostmap[host]:
159 for c in self._hostmap[host]:
160 if self._readymap[c]:
160 if self._readymap[c]:
161 self._readymap[c] = 0
161 self._readymap[c] = 0
162 conn = c
162 conn = c
163 break
163 break
164 finally:
164 finally:
165 self._lock.release()
165 self._lock.release()
166 return conn
166 return conn
167
167
168 def get_all(self, host=None):
168 def get_all(self, host=None):
169 if host:
169 if host:
170 return list(self._hostmap.get(host, []))
170 return list(self._hostmap.get(host, []))
171 else:
171 else:
172 return dict(self._hostmap)
172 return dict(self._hostmap)
173
173
174 class KeepAliveHandler(object):
174 class KeepAliveHandler(object):
175 def __init__(self):
175 def __init__(self):
176 self._cm = ConnectionManager()
176 self._cm = ConnectionManager()
177
177
178 #### Connection Management
178 #### Connection Management
179 def open_connections(self):
179 def open_connections(self):
180 """return a list of connected hosts and the number of connections
180 """return a list of connected hosts and the number of connections
181 to each. [('foo.com:80', 2), ('bar.org', 1)]"""
181 to each. [('foo.com:80', 2), ('bar.org', 1)]"""
182 return [(host, len(li)) for (host, li) in self._cm.get_all().items()]
182 return [(host, len(li)) for (host, li) in self._cm.get_all().items()]
183
183
184 def close_connection(self, host):
184 def close_connection(self, host):
185 """close connection(s) to <host>
185 """close connection(s) to <host>
186 host is the host:port spec, as in 'www.cnn.com:8080' as passed in.
186 host is the host:port spec, as in 'www.cnn.com:8080' as passed in.
187 no error occurs if there is no connection to that host."""
187 no error occurs if there is no connection to that host."""
188 for h in self._cm.get_all(host):
188 for h in self._cm.get_all(host):
189 self._cm.remove(h)
189 self._cm.remove(h)
190 h.close()
190 h.close()
191
191
192 def close_all(self):
192 def close_all(self):
193 """close all open connections"""
193 """close all open connections"""
194 for host, conns in self._cm.get_all().iteritems():
194 for host, conns in self._cm.get_all().iteritems():
195 for h in conns:
195 for h in conns:
196 self._cm.remove(h)
196 self._cm.remove(h)
197 h.close()
197 h.close()
198
198
199 def _request_closed(self, request, host, connection):
199 def _request_closed(self, request, host, connection):
200 """tells us that this request is now closed and that the
200 """tells us that this request is now closed and that the
201 connection is ready for another request"""
201 connection is ready for another request"""
202 self._cm.set_ready(connection, 1)
202 self._cm.set_ready(connection, 1)
203
203
204 def _remove_connection(self, host, connection, close=0):
204 def _remove_connection(self, host, connection, close=0):
205 if close:
205 if close:
206 connection.close()
206 connection.close()
207 self._cm.remove(connection)
207 self._cm.remove(connection)
208
208
209 #### Transaction Execution
209 #### Transaction Execution
210 def http_open(self, req):
210 def http_open(self, req):
211 return self.do_open(HTTPConnection, req)
211 return self.do_open(HTTPConnection, req)
212
212
213 def do_open(self, http_class, req):
213 def do_open(self, http_class, req):
214 host = urllibcompat.gethost(req)
214 host = urllibcompat.gethost(req)
215 if not host:
215 if not host:
216 raise urlerr.urlerror('no host given')
216 raise urlerr.urlerror('no host given')
217
217
218 try:
218 try:
219 h = self._cm.get_ready_conn(host)
219 h = self._cm.get_ready_conn(host)
220 while h:
220 while h:
221 r = self._reuse_connection(h, req, host)
221 r = self._reuse_connection(h, req, host)
222
222
223 # if this response is non-None, then it worked and we're
223 # if this response is non-None, then it worked and we're
224 # done. Break out, skipping the else block.
224 # done. Break out, skipping the else block.
225 if r:
225 if r:
226 break
226 break
227
227
228 # connection is bad - possibly closed by server
228 # connection is bad - possibly closed by server
229 # discard it and ask for the next free connection
229 # discard it and ask for the next free connection
230 h.close()
230 h.close()
231 self._cm.remove(h)
231 self._cm.remove(h)
232 h = self._cm.get_ready_conn(host)
232 h = self._cm.get_ready_conn(host)
233 else:
233 else:
234 # no (working) free connections were found. Create a new one.
234 # no (working) free connections were found. Create a new one.
235 h = http_class(host)
235 h = http_class(host)
236 if DEBUG:
236 if DEBUG:
237 DEBUG.info("creating new connection to %s (%d)",
237 DEBUG.info("creating new connection to %s (%d)",
238 host, id(h))
238 host, id(h))
239 self._cm.add(host, h, 0)
239 self._cm.add(host, h, 0)
240 self._start_transaction(h, req)
240 self._start_transaction(h, req)
241 r = h.getresponse()
241 r = h.getresponse()
242 # The string form of BadStatusLine is the status line. Add some context
242 # The string form of BadStatusLine is the status line. Add some context
243 # to make the error message slightly more useful.
243 # to make the error message slightly more useful.
244 except httplib.BadStatusLine as err:
244 except httplib.BadStatusLine as err:
245 raise urlerr.urlerror(
245 raise urlerr.urlerror(
246 _('bad HTTP status line: %s') % pycompat.sysbytes(err.line))
246 _('bad HTTP status line: %s') % pycompat.sysbytes(err.line))
247 except (socket.error, httplib.HTTPException) as err:
247 except (socket.error, httplib.HTTPException) as err:
248 raise urlerr.urlerror(err)
248 raise urlerr.urlerror(err)
249
249
250 # if not a persistent connection, don't try to reuse it
250 # if not a persistent connection, don't try to reuse it
251 if r.will_close:
251 if r.will_close:
252 self._cm.remove(h)
252 self._cm.remove(h)
253
253
254 if DEBUG:
254 if DEBUG:
255 DEBUG.info("STATUS: %s, %s", r.status, r.reason)
255 DEBUG.info("STATUS: %s, %s", r.status, r.reason)
256 r._handler = self
256 r._handler = self
257 r._host = host
257 r._host = host
258 r._url = req.get_full_url()
258 r._url = req.get_full_url()
259 r._connection = h
259 r._connection = h
260 r.code = r.status
260 r.code = r.status
261 r.headers = r.msg
261 r.headers = r.msg
262 r.msg = r.reason
262 r.msg = r.reason
263
263
264 return r
264 return r
265
265
266 def _reuse_connection(self, h, req, host):
266 def _reuse_connection(self, h, req, host):
267 """start the transaction with a re-used connection
267 """start the transaction with a re-used connection
268 return a response object (r) upon success or None on failure.
268 return a response object (r) upon success or None on failure.
269 This DOES not close or remove bad connections in cases where
269 This DOES not close or remove bad connections in cases where
270 it returns. However, if an unexpected exception occurs, it
270 it returns. However, if an unexpected exception occurs, it
271 will close and remove the connection before re-raising.
271 will close and remove the connection before re-raising.
272 """
272 """
273 try:
273 try:
274 self._start_transaction(h, req)
274 self._start_transaction(h, req)
275 r = h.getresponse()
275 r = h.getresponse()
276 # note: just because we got something back doesn't mean it
276 # note: just because we got something back doesn't mean it
277 # worked. We'll check the version below, too.
277 # worked. We'll check the version below, too.
278 except (socket.error, httplib.HTTPException):
278 except (socket.error, httplib.HTTPException):
279 r = None
279 r = None
280 except: # re-raises
280 except: # re-raises
281 # adding this block just in case we've missed
281 # adding this block just in case we've missed
282 # something we will still raise the exception, but
282 # something we will still raise the exception, but
283 # lets try and close the connection and remove it
283 # lets try and close the connection and remove it
284 # first. We previously got into a nasty loop
284 # first. We previously got into a nasty loop
285 # where an exception was uncaught, and so the
285 # where an exception was uncaught, and so the
286 # connection stayed open. On the next try, the
286 # connection stayed open. On the next try, the
287 # same exception was raised, etc. The trade-off is
287 # same exception was raised, etc. The trade-off is
288 # that it's now possible this call will raise
288 # that it's now possible this call will raise
289 # a DIFFERENT exception
289 # a DIFFERENT exception
290 if DEBUG:
290 if DEBUG:
291 DEBUG.error("unexpected exception - closing "
291 DEBUG.error("unexpected exception - closing "
292 "connection to %s (%d)", host, id(h))
292 "connection to %s (%d)", host, id(h))
293 self._cm.remove(h)
293 self._cm.remove(h)
294 h.close()
294 h.close()
295 raise
295 raise
296
296
297 if r is None or r.version == 9:
297 if r is None or r.version == 9:
298 # httplib falls back to assuming HTTP 0.9 if it gets a
298 # httplib falls back to assuming HTTP 0.9 if it gets a
299 # bad header back. This is most likely to happen if
299 # bad header back. This is most likely to happen if
300 # the socket has been closed by the server since we
300 # the socket has been closed by the server since we
301 # last used the connection.
301 # last used the connection.
302 if DEBUG:
302 if DEBUG:
303 DEBUG.info("failed to re-use connection to %s (%d)",
303 DEBUG.info("failed to re-use connection to %s (%d)",
304 host, id(h))
304 host, id(h))
305 r = None
305 r = None
306 else:
306 else:
307 if DEBUG:
307 if DEBUG:
308 DEBUG.info("re-using connection to %s (%d)", host, id(h))
308 DEBUG.info("re-using connection to %s (%d)", host, id(h))
309
309
310 return r
310 return r
311
311
312 def _start_transaction(self, h, req):
312 def _start_transaction(self, h, req):
313 # What follows mostly reimplements HTTPConnection.request()
313 # What follows mostly reimplements HTTPConnection.request()
314 # except it adds self.parent.addheaders in the mix and sends headers
314 # except it adds self.parent.addheaders in the mix and sends headers
315 # in a deterministic order (to make testing easier).
315 # in a deterministic order (to make testing easier).
316 headers = util.sortdict(self.parent.addheaders)
316 headers = util.sortdict(self.parent.addheaders)
317 headers.update(sorted(req.headers.items()))
317 headers.update(sorted(req.headers.items()))
318 headers.update(sorted(req.unredirected_hdrs.items()))
318 headers.update(sorted(req.unredirected_hdrs.items()))
319 headers = util.sortdict((n.lower(), v) for n, v in headers.items())
319 headers = util.sortdict((n.lower(), v) for n, v in headers.items())
320 skipheaders = {}
320 skipheaders = {}
321 for n in ('host', 'accept-encoding'):
321 for n in ('host', 'accept-encoding'):
322 if n in headers:
322 if n in headers:
323 skipheaders['skip_' + n.replace('-', '_')] = 1
323 skipheaders['skip_' + n.replace('-', '_')] = 1
324 try:
324 try:
325 if urllibcompat.hasdata(req):
325 if urllibcompat.hasdata(req):
326 data = urllibcompat.getdata(req)
326 data = urllibcompat.getdata(req)
327 h.putrequest(
327 h.putrequest(
328 req.get_method(), urllibcompat.getselector(req),
328 req.get_method(), urllibcompat.getselector(req),
329 **pycompat.strkwargs(skipheaders))
329 **pycompat.strkwargs(skipheaders))
330 if r'content-type' not in headers:
330 if r'content-type' not in headers:
331 h.putheader(r'Content-type',
331 h.putheader(r'Content-type',
332 r'application/x-www-form-urlencoded')
332 r'application/x-www-form-urlencoded')
333 if r'content-length' not in headers:
333 if r'content-length' not in headers:
334 h.putheader(r'Content-length', r'%d' % len(data))
334 h.putheader(r'Content-length', r'%d' % len(data))
335 else:
335 else:
336 h.putrequest(
336 h.putrequest(
337 req.get_method(), urllibcompat.getselector(req),
337 req.get_method(), urllibcompat.getselector(req),
338 **pycompat.strkwargs(skipheaders))
338 **pycompat.strkwargs(skipheaders))
339 except socket.error as err:
339 except socket.error as err:
340 raise urlerr.urlerror(err)
340 raise urlerr.urlerror(err)
341 for k, v in headers.items():
341 for k, v in headers.items():
342 h.putheader(k, v)
342 h.putheader(k, v)
343 h.endheaders()
343 h.endheaders()
344 if urllibcompat.hasdata(req):
344 if urllibcompat.hasdata(req):
345 h.send(data)
345 h.send(data)
346
346
347 class HTTPHandler(KeepAliveHandler, urlreq.httphandler):
347 class HTTPHandler(KeepAliveHandler, urlreq.httphandler):
348 pass
348 pass
349
349
350 class HTTPResponse(httplib.HTTPResponse):
350 class HTTPResponse(httplib.HTTPResponse):
351 # we need to subclass HTTPResponse in order to
351 # we need to subclass HTTPResponse in order to
352 # 1) add readline(), readlines(), and readinto() methods
352 # 1) add readline(), readlines(), and readinto() methods
353 # 2) add close_connection() methods
353 # 2) add close_connection() methods
354 # 3) add info() and geturl() methods
354 # 3) add info() and geturl() methods
355
355
356 # in order to add readline(), read must be modified to deal with a
356 # in order to add readline(), read must be modified to deal with a
357 # buffer. example: readline must read a buffer and then spit back
357 # buffer. example: readline must read a buffer and then spit back
358 # one line at a time. The only real alternative is to read one
358 # one line at a time. The only real alternative is to read one
359 # BYTE at a time (ick). Once something has been read, it can't be
359 # BYTE at a time (ick). Once something has been read, it can't be
360 # put back (ok, maybe it can, but that's even uglier than this),
360 # put back (ok, maybe it can, but that's even uglier than this),
361 # so if you THEN do a normal read, you must first take stuff from
361 # so if you THEN do a normal read, you must first take stuff from
362 # the buffer.
362 # the buffer.
363
363
364 # the read method wraps the original to accommodate buffering,
364 # the read method wraps the original to accommodate buffering,
365 # although read() never adds to the buffer.
365 # although read() never adds to the buffer.
366 # Both readline and readlines have been stolen with almost no
366 # Both readline and readlines have been stolen with almost no
367 # modification from socket.py
367 # modification from socket.py
368
368
369
369
370 def __init__(self, sock, debuglevel=0, strict=0, method=None):
370 def __init__(self, sock, debuglevel=0, strict=0, method=None):
371 extrakw = {}
371 extrakw = {}
372 if not pycompat.ispy3:
372 if not pycompat.ispy3:
373 extrakw[r'strict'] = True
373 extrakw[r'strict'] = True
374 extrakw[r'buffering'] = True
374 extrakw[r'buffering'] = True
375 httplib.HTTPResponse.__init__(self, sock, debuglevel=debuglevel,
375 httplib.HTTPResponse.__init__(self, sock, debuglevel=debuglevel,
376 method=method, **extrakw)
376 method=method, **extrakw)
377 self.fileno = sock.fileno
377 self.fileno = sock.fileno
378 self.code = None
378 self.code = None
379 self._rbuf = ''
379 self._rbuf = ''
380 self._rbufsize = 8096
380 self._rbufsize = 8096
381 self._handler = None # inserted by the handler later
381 self._handler = None # inserted by the handler later
382 self._host = None # (same)
382 self._host = None # (same)
383 self._url = None # (same)
383 self._url = None # (same)
384 self._connection = None # (same)
384 self._connection = None # (same)
385
385
386 _raw_read = httplib.HTTPResponse.read
386 _raw_read = httplib.HTTPResponse.read
387 _raw_readinto = getattr(httplib.HTTPResponse, 'readinto', None)
387
388
388 def close(self):
389 def close(self):
389 if self.fp:
390 if self.fp:
390 self.fp.close()
391 self.fp.close()
391 self.fp = None
392 self.fp = None
392 if self._handler:
393 if self._handler:
393 self._handler._request_closed(self, self._host,
394 self._handler._request_closed(self, self._host,
394 self._connection)
395 self._connection)
395
396
396 def close_connection(self):
397 def close_connection(self):
397 self._handler._remove_connection(self._host, self._connection, close=1)
398 self._handler._remove_connection(self._host, self._connection, close=1)
398 self.close()
399 self.close()
399
400
400 def info(self):
401 def info(self):
401 return self.headers
402 return self.headers
402
403
403 def geturl(self):
404 def geturl(self):
404 return self._url
405 return self._url
405
406
406 def read(self, amt=None):
407 def read(self, amt=None):
407 # the _rbuf test is only in this first if for speed. It's not
408 # the _rbuf test is only in this first if for speed. It's not
408 # logically necessary
409 # logically necessary
409 if self._rbuf and amt is not None:
410 if self._rbuf and amt is not None:
410 L = len(self._rbuf)
411 L = len(self._rbuf)
411 if amt > L:
412 if amt > L:
412 amt -= L
413 amt -= L
413 else:
414 else:
414 s = self._rbuf[:amt]
415 s = self._rbuf[:amt]
415 self._rbuf = self._rbuf[amt:]
416 self._rbuf = self._rbuf[amt:]
416 return s
417 return s
417
418
418 s = self._rbuf + self._raw_read(amt)
419 s = self._rbuf + self._raw_read(amt)
419 self._rbuf = ''
420 self._rbuf = ''
420 return s
421 return s
421
422
422 # stolen from Python SVN #68532 to fix issue1088
423 # stolen from Python SVN #68532 to fix issue1088
423 def _read_chunked(self, amt):
424 def _read_chunked(self, amt):
424 chunk_left = self.chunk_left
425 chunk_left = self.chunk_left
425 parts = []
426 parts = []
426
427
427 while True:
428 while True:
428 if chunk_left is None:
429 if chunk_left is None:
429 line = self.fp.readline()
430 line = self.fp.readline()
430 i = line.find(';')
431 i = line.find(';')
431 if i >= 0:
432 if i >= 0:
432 line = line[:i] # strip chunk-extensions
433 line = line[:i] # strip chunk-extensions
433 try:
434 try:
434 chunk_left = int(line, 16)
435 chunk_left = int(line, 16)
435 except ValueError:
436 except ValueError:
436 # close the connection as protocol synchronization is
437 # close the connection as protocol synchronization is
437 # probably lost
438 # probably lost
438 self.close()
439 self.close()
439 raise httplib.IncompleteRead(''.join(parts))
440 raise httplib.IncompleteRead(''.join(parts))
440 if chunk_left == 0:
441 if chunk_left == 0:
441 break
442 break
442 if amt is None:
443 if amt is None:
443 parts.append(self._safe_read(chunk_left))
444 parts.append(self._safe_read(chunk_left))
444 elif amt < chunk_left:
445 elif amt < chunk_left:
445 parts.append(self._safe_read(amt))
446 parts.append(self._safe_read(amt))
446 self.chunk_left = chunk_left - amt
447 self.chunk_left = chunk_left - amt
447 return ''.join(parts)
448 return ''.join(parts)
448 elif amt == chunk_left:
449 elif amt == chunk_left:
449 parts.append(self._safe_read(amt))
450 parts.append(self._safe_read(amt))
450 self._safe_read(2) # toss the CRLF at the end of the chunk
451 self._safe_read(2) # toss the CRLF at the end of the chunk
451 self.chunk_left = None
452 self.chunk_left = None
452 return ''.join(parts)
453 return ''.join(parts)
453 else:
454 else:
454 parts.append(self._safe_read(chunk_left))
455 parts.append(self._safe_read(chunk_left))
455 amt -= chunk_left
456 amt -= chunk_left
456
457
457 # we read the whole chunk, get another
458 # we read the whole chunk, get another
458 self._safe_read(2) # toss the CRLF at the end of the chunk
459 self._safe_read(2) # toss the CRLF at the end of the chunk
459 chunk_left = None
460 chunk_left = None
460
461
461 # read and discard trailer up to the CRLF terminator
462 # read and discard trailer up to the CRLF terminator
462 ### note: we shouldn't have any trailers!
463 ### note: we shouldn't have any trailers!
463 while True:
464 while True:
464 line = self.fp.readline()
465 line = self.fp.readline()
465 if not line:
466 if not line:
466 # a vanishingly small number of sites EOF without
467 # a vanishingly small number of sites EOF without
467 # sending the trailer
468 # sending the trailer
468 break
469 break
469 if line == '\r\n':
470 if line == '\r\n':
470 break
471 break
471
472
472 # we read everything; close the "file"
473 # we read everything; close the "file"
473 self.close()
474 self.close()
474
475
475 return ''.join(parts)
476 return ''.join(parts)
476
477
477 def readline(self):
478 def readline(self):
478 # Fast path for a line is already available in read buffer.
479 # Fast path for a line is already available in read buffer.
479 i = self._rbuf.find('\n')
480 i = self._rbuf.find('\n')
480 if i >= 0:
481 if i >= 0:
481 i += 1
482 i += 1
482 line = self._rbuf[:i]
483 line = self._rbuf[:i]
483 self._rbuf = self._rbuf[i:]
484 self._rbuf = self._rbuf[i:]
484 return line
485 return line
485
486
486 # No newline in local buffer. Read until we find one.
487 # No newline in local buffer. Read until we find one.
487 chunks = [self._rbuf]
488 chunks = [self._rbuf]
488 i = -1
489 i = -1
489 readsize = self._rbufsize
490 readsize = self._rbufsize
490 while True:
491 while True:
491 new = self._raw_read(readsize)
492 new = self._raw_read(readsize)
492 if not new:
493 if not new:
493 break
494 break
494
495
495 chunks.append(new)
496 chunks.append(new)
496 i = new.find('\n')
497 i = new.find('\n')
497 if i >= 0:
498 if i >= 0:
498 break
499 break
499
500
500 # We either have exhausted the stream or have a newline in chunks[-1].
501 # We either have exhausted the stream or have a newline in chunks[-1].
501
502
502 # EOF
503 # EOF
503 if i == -1:
504 if i == -1:
504 self._rbuf = ''
505 self._rbuf = ''
505 return ''.join(chunks)
506 return ''.join(chunks)
506
507
507 i += 1
508 i += 1
508 self._rbuf = chunks[-1][i:]
509 self._rbuf = chunks[-1][i:]
509 chunks[-1] = chunks[-1][:i]
510 chunks[-1] = chunks[-1][:i]
510 return ''.join(chunks)
511 return ''.join(chunks)
511
512
512 def readlines(self, sizehint=0):
513 def readlines(self, sizehint=0):
513 total = 0
514 total = 0
514 list = []
515 list = []
515 while True:
516 while True:
516 line = self.readline()
517 line = self.readline()
517 if not line:
518 if not line:
518 break
519 break
519 list.append(line)
520 list.append(line)
520 total += len(line)
521 total += len(line)
521 if sizehint and total >= sizehint:
522 if sizehint and total >= sizehint:
522 break
523 break
523 return list
524 return list
524
525
525 def readinto(self, dest):
526 def readinto(self, dest):
527 if self._raw_readinto is None:
526 res = self.read(len(dest))
528 res = self.read(len(dest))
527 if not res:
529 if not res:
528 return 0
530 return 0
529
530 dest[0:len(res)] = res
531 dest[0:len(res)] = res
531 return len(res)
532 return len(res)
533 total = len(dest)
534 have = len(self._rbuf)
535 if have >= total:
536 dest[0:total] = self._rbuf[:total]
537 self._rbuf = self._rbuf[total:]
538 return total
539 mv = memoryview(dest)
540 got = self._raw_readinto(mv[have:total])
541 dest[0:have] = self._rbuf
542 got += len(self._rbuf)
543 self._rbuf = ''
544 return got
532
545
533 def safesend(self, str):
546 def safesend(self, str):
534 """Send `str' to the server.
547 """Send `str' to the server.
535
548
536 Shamelessly ripped off from httplib to patch a bad behavior.
549 Shamelessly ripped off from httplib to patch a bad behavior.
537 """
550 """
538 # _broken_pipe_resp is an attribute we set in this function
551 # _broken_pipe_resp is an attribute we set in this function
539 # if the socket is closed while we're sending data but
552 # if the socket is closed while we're sending data but
540 # the server sent us a response before hanging up.
553 # the server sent us a response before hanging up.
541 # In that case, we want to pretend to send the rest of the
554 # In that case, we want to pretend to send the rest of the
542 # outgoing data, and then let the user use getresponse()
555 # outgoing data, and then let the user use getresponse()
543 # (which we wrap) to get this last response before
556 # (which we wrap) to get this last response before
544 # opening a new socket.
557 # opening a new socket.
545 if getattr(self, '_broken_pipe_resp', None) is not None:
558 if getattr(self, '_broken_pipe_resp', None) is not None:
546 return
559 return
547
560
548 if self.sock is None:
561 if self.sock is None:
549 if self.auto_open:
562 if self.auto_open:
550 self.connect()
563 self.connect()
551 else:
564 else:
552 raise httplib.NotConnected
565 raise httplib.NotConnected
553
566
554 # send the data to the server. if we get a broken pipe, then close
567 # send the data to the server. if we get a broken pipe, then close
555 # the socket. we want to reconnect when somebody tries to send again.
568 # the socket. we want to reconnect when somebody tries to send again.
556 #
569 #
557 # NOTE: we DO propagate the error, though, because we cannot simply
570 # NOTE: we DO propagate the error, though, because we cannot simply
558 # ignore the error... the caller will know if they can retry.
571 # ignore the error... the caller will know if they can retry.
559 if self.debuglevel > 0:
572 if self.debuglevel > 0:
560 print("send:", repr(str))
573 print("send:", repr(str))
561 try:
574 try:
562 blocksize = 8192
575 blocksize = 8192
563 read = getattr(str, 'read', None)
576 read = getattr(str, 'read', None)
564 if read is not None:
577 if read is not None:
565 if self.debuglevel > 0:
578 if self.debuglevel > 0:
566 print("sending a read()able")
579 print("sending a read()able")
567 data = read(blocksize)
580 data = read(blocksize)
568 while data:
581 while data:
569 self.sock.sendall(data)
582 self.sock.sendall(data)
570 data = read(blocksize)
583 data = read(blocksize)
571 else:
584 else:
572 self.sock.sendall(str)
585 self.sock.sendall(str)
573 except socket.error as v:
586 except socket.error as v:
574 reraise = True
587 reraise = True
575 if v[0] == errno.EPIPE: # Broken pipe
588 if v[0] == errno.EPIPE: # Broken pipe
576 if self._HTTPConnection__state == httplib._CS_REQ_SENT:
589 if self._HTTPConnection__state == httplib._CS_REQ_SENT:
577 self._broken_pipe_resp = None
590 self._broken_pipe_resp = None
578 self._broken_pipe_resp = self.getresponse()
591 self._broken_pipe_resp = self.getresponse()
579 reraise = False
592 reraise = False
580 self.close()
593 self.close()
581 if reraise:
594 if reraise:
582 raise
595 raise
583
596
584 def wrapgetresponse(cls):
597 def wrapgetresponse(cls):
585 """Wraps getresponse in cls with a broken-pipe sane version.
598 """Wraps getresponse in cls with a broken-pipe sane version.
586 """
599 """
587 def safegetresponse(self):
600 def safegetresponse(self):
588 # In safesend() we might set the _broken_pipe_resp
601 # In safesend() we might set the _broken_pipe_resp
589 # attribute, in which case the socket has already
602 # attribute, in which case the socket has already
590 # been closed and we just need to give them the response
603 # been closed and we just need to give them the response
591 # back. Otherwise, we use the normal response path.
604 # back. Otherwise, we use the normal response path.
592 r = getattr(self, '_broken_pipe_resp', None)
605 r = getattr(self, '_broken_pipe_resp', None)
593 if r is not None:
606 if r is not None:
594 return r
607 return r
595 return cls.getresponse(self)
608 return cls.getresponse(self)
596 safegetresponse.__doc__ = cls.getresponse.__doc__
609 safegetresponse.__doc__ = cls.getresponse.__doc__
597 return safegetresponse
610 return safegetresponse
598
611
599 class HTTPConnection(httplib.HTTPConnection):
612 class HTTPConnection(httplib.HTTPConnection):
600 # use the modified response class
613 # use the modified response class
601 response_class = HTTPResponse
614 response_class = HTTPResponse
602 send = safesend
615 send = safesend
603 getresponse = wrapgetresponse(httplib.HTTPConnection)
616 getresponse = wrapgetresponse(httplib.HTTPConnection)
604
617
605
618
606 #########################################################################
619 #########################################################################
607 ##### TEST FUNCTIONS
620 ##### TEST FUNCTIONS
608 #########################################################################
621 #########################################################################
609
622
610
623
611 def continuity(url):
624 def continuity(url):
612 md5 = hashlib.md5
625 md5 = hashlib.md5
613 format = '%25s: %s'
626 format = '%25s: %s'
614
627
615 # first fetch the file with the normal http handler
628 # first fetch the file with the normal http handler
616 opener = urlreq.buildopener()
629 opener = urlreq.buildopener()
617 urlreq.installopener(opener)
630 urlreq.installopener(opener)
618 fo = urlreq.urlopen(url)
631 fo = urlreq.urlopen(url)
619 foo = fo.read()
632 foo = fo.read()
620 fo.close()
633 fo.close()
621 m = md5(foo)
634 m = md5(foo)
622 print(format % ('normal urllib', node.hex(m.digest())))
635 print(format % ('normal urllib', node.hex(m.digest())))
623
636
624 # now install the keepalive handler and try again
637 # now install the keepalive handler and try again
625 opener = urlreq.buildopener(HTTPHandler())
638 opener = urlreq.buildopener(HTTPHandler())
626 urlreq.installopener(opener)
639 urlreq.installopener(opener)
627
640
628 fo = urlreq.urlopen(url)
641 fo = urlreq.urlopen(url)
629 foo = fo.read()
642 foo = fo.read()
630 fo.close()
643 fo.close()
631 m = md5(foo)
644 m = md5(foo)
632 print(format % ('keepalive read', node.hex(m.digest())))
645 print(format % ('keepalive read', node.hex(m.digest())))
633
646
634 fo = urlreq.urlopen(url)
647 fo = urlreq.urlopen(url)
635 foo = ''
648 foo = ''
636 while True:
649 while True:
637 f = fo.readline()
650 f = fo.readline()
638 if f:
651 if f:
639 foo = foo + f
652 foo = foo + f
640 else:
653 else:
641 break
654 break
642 fo.close()
655 fo.close()
643 m = md5(foo)
656 m = md5(foo)
644 print(format % ('keepalive readline', node.hex(m.digest())))
657 print(format % ('keepalive readline', node.hex(m.digest())))
645
658
646 def comp(N, url):
659 def comp(N, url):
647 print(' making %i connections to:\n %s' % (N, url))
660 print(' making %i connections to:\n %s' % (N, url))
648
661
649 procutil.stdout.write(' first using the normal urllib handlers')
662 procutil.stdout.write(' first using the normal urllib handlers')
650 # first use normal opener
663 # first use normal opener
651 opener = urlreq.buildopener()
664 opener = urlreq.buildopener()
652 urlreq.installopener(opener)
665 urlreq.installopener(opener)
653 t1 = fetch(N, url)
666 t1 = fetch(N, url)
654 print(' TIME: %.3f s' % t1)
667 print(' TIME: %.3f s' % t1)
655
668
656 procutil.stdout.write(' now using the keepalive handler ')
669 procutil.stdout.write(' now using the keepalive handler ')
657 # now install the keepalive handler and try again
670 # now install the keepalive handler and try again
658 opener = urlreq.buildopener(HTTPHandler())
671 opener = urlreq.buildopener(HTTPHandler())
659 urlreq.installopener(opener)
672 urlreq.installopener(opener)
660 t2 = fetch(N, url)
673 t2 = fetch(N, url)
661 print(' TIME: %.3f s' % t2)
674 print(' TIME: %.3f s' % t2)
662 print(' improvement factor: %.2f' % (t1 / t2))
675 print(' improvement factor: %.2f' % (t1 / t2))
663
676
664 def fetch(N, url, delay=0):
677 def fetch(N, url, delay=0):
665 import time
678 import time
666 lens = []
679 lens = []
667 starttime = time.time()
680 starttime = time.time()
668 for i in range(N):
681 for i in range(N):
669 if delay and i > 0:
682 if delay and i > 0:
670 time.sleep(delay)
683 time.sleep(delay)
671 fo = urlreq.urlopen(url)
684 fo = urlreq.urlopen(url)
672 foo = fo.read()
685 foo = fo.read()
673 fo.close()
686 fo.close()
674 lens.append(len(foo))
687 lens.append(len(foo))
675 diff = time.time() - starttime
688 diff = time.time() - starttime
676
689
677 j = 0
690 j = 0
678 for i in lens[1:]:
691 for i in lens[1:]:
679 j = j + 1
692 j = j + 1
680 if not i == lens[0]:
693 if not i == lens[0]:
681 print("WARNING: inconsistent length on read %i: %i" % (j, i))
694 print("WARNING: inconsistent length on read %i: %i" % (j, i))
682
695
683 return diff
696 return diff
684
697
685 def test_timeout(url):
698 def test_timeout(url):
686 global DEBUG
699 global DEBUG
687 dbbackup = DEBUG
700 dbbackup = DEBUG
688 class FakeLogger(object):
701 class FakeLogger(object):
689 def debug(self, msg, *args):
702 def debug(self, msg, *args):
690 print(msg % args)
703 print(msg % args)
691 info = warning = error = debug
704 info = warning = error = debug
692 DEBUG = FakeLogger()
705 DEBUG = FakeLogger()
693 print(" fetching the file to establish a connection")
706 print(" fetching the file to establish a connection")
694 fo = urlreq.urlopen(url)
707 fo = urlreq.urlopen(url)
695 data1 = fo.read()
708 data1 = fo.read()
696 fo.close()
709 fo.close()
697
710
698 i = 20
711 i = 20
699 print(" waiting %i seconds for the server to close the connection" % i)
712 print(" waiting %i seconds for the server to close the connection" % i)
700 while i > 0:
713 while i > 0:
701 procutil.stdout.write('\r %2i' % i)
714 procutil.stdout.write('\r %2i' % i)
702 procutil.stdout.flush()
715 procutil.stdout.flush()
703 time.sleep(1)
716 time.sleep(1)
704 i -= 1
717 i -= 1
705 procutil.stderr.write('\r')
718 procutil.stderr.write('\r')
706
719
707 print(" fetching the file a second time")
720 print(" fetching the file a second time")
708 fo = urlreq.urlopen(url)
721 fo = urlreq.urlopen(url)
709 data2 = fo.read()
722 data2 = fo.read()
710 fo.close()
723 fo.close()
711
724
712 if data1 == data2:
725 if data1 == data2:
713 print(' data are identical')
726 print(' data are identical')
714 else:
727 else:
715 print(' ERROR: DATA DIFFER')
728 print(' ERROR: DATA DIFFER')
716
729
717 DEBUG = dbbackup
730 DEBUG = dbbackup
718
731
719
732
720 def test(url, N=10):
733 def test(url, N=10):
721 print("performing continuity test (making sure stuff isn't corrupted)")
734 print("performing continuity test (making sure stuff isn't corrupted)")
722 continuity(url)
735 continuity(url)
723 print('')
736 print('')
724 print("performing speed comparison")
737 print("performing speed comparison")
725 comp(N, url)
738 comp(N, url)
726 print('')
739 print('')
727 print("performing dropped-connection check")
740 print("performing dropped-connection check")
728 test_timeout(url)
741 test_timeout(url)
729
742
730 if __name__ == '__main__':
743 if __name__ == '__main__':
731 import time
744 import time
732 try:
745 try:
733 N = int(sys.argv[1])
746 N = int(sys.argv[1])
734 url = sys.argv[2]
747 url = sys.argv[2]
735 except (IndexError, ValueError):
748 except (IndexError, ValueError):
736 print("%s <integer> <url>" % sys.argv[0])
749 print("%s <integer> <url>" % sys.argv[0])
737 else:
750 else:
738 test(url, N)
751 test(url, N)
General Comments 0
You need to be logged in to leave comments. Login now