##// END OF EJS Templates
keepalive: attempt to fix issue1003...
Matt Mackall -
r8146:4f13ed6e default
parent child Browse files
Show More
@@ -1,640 +1,653 b''
1 1 # This library is free software; you can redistribute it and/or
2 2 # modify it under the terms of the GNU Lesser General Public
3 3 # License as published by the Free Software Foundation; either
4 4 # version 2.1 of the License, or (at your option) any later version.
5 5 #
6 6 # This library is distributed in the hope that it will be useful,
7 7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
9 9 # Lesser General Public License for more details.
10 10 #
11 11 # You should have received a copy of the GNU Lesser General Public
12 12 # License along with this library; if not, write to the
13 13 # Free Software Foundation, Inc.,
14 14 # 59 Temple Place, Suite 330,
15 15 # Boston, MA 02111-1307 USA
16 16
17 17 # This file is part of urlgrabber, a high-level cross-protocol url-grabber
18 18 # Copyright 2002-2004 Michael D. Stenner, Ryan Tomayko
19 19
20 20 # Modified by Benoit Boissinot:
21 21 # - fix for digest auth (inspired from urllib2.py @ Python v2.4)
22 22 # Modified by Dirkjan Ochtman:
23 23 # - import md5 function from a local util module
24 24
25 25 """An HTTP handler for urllib2 that supports HTTP 1.1 and keepalive.
26 26
27 27 >>> import urllib2
28 28 >>> from keepalive import HTTPHandler
29 29 >>> keepalive_handler = HTTPHandler()
30 30 >>> opener = urllib2.build_opener(keepalive_handler)
31 31 >>> urllib2.install_opener(opener)
32 32 >>>
33 33 >>> fo = urllib2.urlopen('http://www.python.org')
34 34
35 35 If a connection to a given host is requested, and all of the existing
36 36 connections are still in use, another connection will be opened. If
37 37 the handler tries to use an existing connection but it fails in some
38 38 way, it will be closed and removed from the pool.
39 39
40 40 To remove the handler, simply re-run build_opener with no arguments, and
41 41 install that opener.
42 42
43 43 You can explicitly close connections by using the close_connection()
44 44 method of the returned file-like object (described below) or you can
45 45 use the handler methods:
46 46
47 47 close_connection(host)
48 48 close_all()
49 49 open_connections()
50 50
51 51 NOTE: using the close_connection and close_all methods of the handler
52 52 should be done with care when using multiple threads.
53 53 * there is nothing that prevents another thread from creating new
54 54 connections immediately after connections are closed
55 55 * no checks are done to prevent in-use connections from being closed
56 56
57 57 >>> keepalive_handler.close_all()
58 58
59 59 EXTRA ATTRIBUTES AND METHODS
60 60
61 61 Upon a status of 200, the object returned has a few additional
62 62 attributes and methods, which should not be used if you want to
63 63 remain consistent with the normal urllib2-returned objects:
64 64
65 65 close_connection() - close the connection to the host
66 66 readlines() - you know, readlines()
67 67 status - the return status (ie 404)
68 68 reason - english translation of status (ie 'File not found')
69 69
70 70 If you want the best of both worlds, use this inside an
71 71 AttributeError-catching try:
72 72
73 73 >>> try: status = fo.status
74 74 >>> except AttributeError: status = None
75 75
76 76 Unfortunately, these are ONLY there if status == 200, so it's not
77 77 easy to distinguish between non-200 responses. The reason is that
78 78 urllib2 tries to do clever things with error codes 301, 302, 401,
79 79 and 407, and it wraps the object upon return.
80 80
81 81 For python versions earlier than 2.4, you can avoid this fancy error
82 82 handling by setting the module-level global HANDLE_ERRORS to zero.
83 83 You see, prior to 2.4, it's the HTTP Handler's job to determine what
84 84 to handle specially, and what to just pass up. HANDLE_ERRORS == 0
85 85 means "pass everything up". In python 2.4, however, this job no
86 86 longer belongs to the HTTP Handler and is now done by a NEW handler,
87 87 HTTPErrorProcessor. Here's the bottom line:
88 88
89 89 python version < 2.4
90 90 HANDLE_ERRORS == 1 (default) pass up 200, treat the rest as
91 91 errors
92 92 HANDLE_ERRORS == 0 pass everything up, error processing is
93 93 left to the calling code
94 94 python version >= 2.4
95 95 HANDLE_ERRORS == 1 pass up 200, treat the rest as errors
96 96 HANDLE_ERRORS == 0 (default) pass everything up, let the
97 97 other handlers (specifically,
98 98 HTTPErrorProcessor) decide what to do
99 99
100 100 In practice, setting the variable either way makes little difference
101 101 in python 2.4, so for the most consistent behavior across versions,
102 102 you probably just want to use the defaults, which will give you
103 103 exceptions on errors.
104 104
105 105 """
106 106
107 107 # $Id: keepalive.py,v 1.14 2006/04/04 21:00:32 mstenner Exp $
108 108
109 109 import urllib2
110 110 import httplib
111 111 import socket
112 112 import thread
113 113
114 114 DEBUG = None
115 115
116 116 import sys
117 117 if sys.version_info < (2, 4): HANDLE_ERRORS = 1
118 118 else: HANDLE_ERRORS = 0
119 119
120 120 class ConnectionManager:
121 121 """
122 122 The connection manager must be able to:
123 123 * keep track of all existing
124 124 """
125 125 def __init__(self):
126 126 self._lock = thread.allocate_lock()
127 127 self._hostmap = {} # map hosts to a list of connections
128 128 self._connmap = {} # map connections to host
129 129 self._readymap = {} # map connection to ready state
130 130
131 131 def add(self, host, connection, ready):
132 132 self._lock.acquire()
133 133 try:
134 134 if not host in self._hostmap: self._hostmap[host] = []
135 135 self._hostmap[host].append(connection)
136 136 self._connmap[connection] = host
137 137 self._readymap[connection] = ready
138 138 finally:
139 139 self._lock.release()
140 140
141 141 def remove(self, connection):
142 142 self._lock.acquire()
143 143 try:
144 144 try:
145 145 host = self._connmap[connection]
146 146 except KeyError:
147 147 pass
148 148 else:
149 149 del self._connmap[connection]
150 150 del self._readymap[connection]
151 151 self._hostmap[host].remove(connection)
152 152 if not self._hostmap[host]: del self._hostmap[host]
153 153 finally:
154 154 self._lock.release()
155 155
156 156 def set_ready(self, connection, ready):
157 157 try: self._readymap[connection] = ready
158 158 except KeyError: pass
159 159
160 160 def get_ready_conn(self, host):
161 161 conn = None
162 162 self._lock.acquire()
163 163 try:
164 164 if host in self._hostmap:
165 165 for c in self._hostmap[host]:
166 166 if self._readymap[c]:
167 167 self._readymap[c] = 0
168 168 conn = c
169 169 break
170 170 finally:
171 171 self._lock.release()
172 172 return conn
173 173
174 174 def get_all(self, host=None):
175 175 if host:
176 176 return list(self._hostmap.get(host, []))
177 177 else:
178 178 return dict(self._hostmap)
179 179
180 180 class KeepAliveHandler:
181 181 def __init__(self):
182 182 self._cm = ConnectionManager()
183 183
184 184 #### Connection Management
185 185 def open_connections(self):
186 186 """return a list of connected hosts and the number of connections
187 187 to each. [('foo.com:80', 2), ('bar.org', 1)]"""
188 188 return [(host, len(li)) for (host, li) in self._cm.get_all().items()]
189 189
190 190 def close_connection(self, host):
191 191 """close connection(s) to <host>
192 192 host is the host:port spec, as in 'www.cnn.com:8080' as passed in.
193 193 no error occurs if there is no connection to that host."""
194 194 for h in self._cm.get_all(host):
195 195 self._cm.remove(h)
196 196 h.close()
197 197
198 198 def close_all(self):
199 199 """close all open connections"""
200 200 for host, conns in self._cm.get_all().iteritems():
201 201 for h in conns:
202 202 self._cm.remove(h)
203 203 h.close()
204 204
205 205 def _request_closed(self, request, host, connection):
206 206 """tells us that this request is now closed and the the
207 207 connection is ready for another request"""
208 208 self._cm.set_ready(connection, 1)
209 209
210 210 def _remove_connection(self, host, connection, close=0):
211 211 if close: connection.close()
212 212 self._cm.remove(connection)
213 213
214 214 #### Transaction Execution
215 215 def http_open(self, req):
216 216 return self.do_open(HTTPConnection, req)
217 217
218 218 def do_open(self, http_class, req):
219 219 host = req.get_host()
220 220 if not host:
221 221 raise urllib2.URLError('no host given')
222 222
223 223 try:
224 224 h = self._cm.get_ready_conn(host)
225 225 while h:
226 226 r = self._reuse_connection(h, req, host)
227 227
228 228 # if this response is non-None, then it worked and we're
229 229 # done. Break out, skipping the else block.
230 230 if r: break
231 231
232 232 # connection is bad - possibly closed by server
233 233 # discard it and ask for the next free connection
234 234 h.close()
235 235 self._cm.remove(h)
236 236 h = self._cm.get_ready_conn(host)
237 237 else:
238 238 # no (working) free connections were found. Create a new one.
239 239 h = http_class(host)
240 240 if DEBUG: DEBUG.info("creating new connection to %s (%d)",
241 241 host, id(h))
242 242 self._cm.add(host, h, 0)
243 243 self._start_transaction(h, req)
244 244 r = h.getresponse()
245 245 except (socket.error, httplib.HTTPException), err:
246 246 raise urllib2.URLError(err)
247 247
248 248 # if not a persistent connection, don't try to reuse it
249 249 if r.will_close: self._cm.remove(h)
250 250
251 251 if DEBUG: DEBUG.info("STATUS: %s, %s", r.status, r.reason)
252 252 r._handler = self
253 253 r._host = host
254 254 r._url = req.get_full_url()
255 255 r._connection = h
256 256 r.code = r.status
257 257 r.headers = r.msg
258 258 r.msg = r.reason
259 259
260 260 if r.status == 200 or not HANDLE_ERRORS:
261 261 return r
262 262 else:
263 263 return self.parent.error('http', req, r,
264 264 r.status, r.msg, r.headers)
265 265
266 266 def _reuse_connection(self, h, req, host):
267 267 """start the transaction with a re-used connection
268 268 return a response object (r) upon success or None on failure.
269 269 This DOES not close or remove bad connections in cases where
270 270 it returns. However, if an unexpected exception occurs, it
271 271 will close and remove the connection before re-raising.
272 272 """
273 273 try:
274 274 self._start_transaction(h, req)
275 275 r = h.getresponse()
276 276 # note: just because we got something back doesn't mean it
277 277 # worked. We'll check the version below, too.
278 278 except (socket.error, httplib.HTTPException):
279 279 r = None
280 280 except:
281 281 # adding this block just in case we've missed
282 282 # something we will still raise the exception, but
283 283 # lets try and close the connection and remove it
284 284 # first. We previously got into a nasty loop
285 285 # where an exception was uncaught, and so the
286 286 # connection stayed open. On the next try, the
287 287 # same exception was raised, etc. The tradeoff is
288 288 # that it's now possible this call will raise
289 289 # a DIFFERENT exception
290 290 if DEBUG: DEBUG.error("unexpected exception - closing " + \
291 291 "connection to %s (%d)", host, id(h))
292 292 self._cm.remove(h)
293 293 h.close()
294 294 raise
295 295
296 296 if r is None or r.version == 9:
297 297 # httplib falls back to assuming HTTP 0.9 if it gets a
298 298 # bad header back. This is most likely to happen if
299 299 # the socket has been closed by the server since we
300 300 # last used the connection.
301 301 if DEBUG: DEBUG.info("failed to re-use connection to %s (%d)",
302 302 host, id(h))
303 303 r = None
304 304 else:
305 305 if DEBUG: DEBUG.info("re-using connection to %s (%d)", host, id(h))
306 306
307 307 return r
308 308
309 309 def _start_transaction(self, h, req):
310 headers = req.headers.copy()
311 body = req.data
312 if sys.version_info >= (2, 4):
313 headers.update(req.unredirected_hdrs)
314 310 try:
315 h.request(req.get_method(), req.get_selector(), body, headers)
316 except socket.error, err: # XXX what error?
311 if req.has_data():
312 data = req.get_data()
313 h.putrequest('POST', req.get_selector())
314 if 'Content-type' not in req.headers:
315 h.putheader('Content-type',
316 'application/x-www-form-urlencoded')
317 if 'Content-length' not in req.headers:
318 h.putheader('Content-length', '%d' % len(data))
319 else:
320 h.putrequest('GET', req.get_selector())
321 except (socket.error), err:
317 322 raise urllib2.URLError(err)
318 323
324 for args in self.parent.addheaders:
325 h.putheader(*args)
326 for k, v in req.headers.items():
327 h.putheader(k, v)
328 h.endheaders()
329 if req.has_data():
330 h.send(data)
331
319 332 class HTTPHandler(KeepAliveHandler, urllib2.HTTPHandler):
320 333 pass
321 334
322 335 class HTTPResponse(httplib.HTTPResponse):
323 336 # we need to subclass HTTPResponse in order to
324 337 # 1) add readline() and readlines() methods
325 338 # 2) add close_connection() methods
326 339 # 3) add info() and geturl() methods
327 340
328 341 # in order to add readline(), read must be modified to deal with a
329 342 # buffer. example: readline must read a buffer and then spit back
330 343 # one line at a time. The only real alternative is to read one
331 344 # BYTE at a time (ick). Once something has been read, it can't be
332 345 # put back (ok, maybe it can, but that's even uglier than this),
333 346 # so if you THEN do a normal read, you must first take stuff from
334 347 # the buffer.
335 348
336 349 # the read method wraps the original to accomodate buffering,
337 350 # although read() never adds to the buffer.
338 351 # Both readline and readlines have been stolen with almost no
339 352 # modification from socket.py
340 353
341 354
342 355 def __init__(self, sock, debuglevel=0, strict=0, method=None):
343 356 if method: # the httplib in python 2.3 uses the method arg
344 357 httplib.HTTPResponse.__init__(self, sock, debuglevel, method)
345 358 else: # 2.2 doesn't
346 359 httplib.HTTPResponse.__init__(self, sock, debuglevel)
347 360 self.fileno = sock.fileno
348 361 self.code = None
349 362 self._rbuf = ''
350 363 self._rbufsize = 8096
351 364 self._handler = None # inserted by the handler later
352 365 self._host = None # (same)
353 366 self._url = None # (same)
354 367 self._connection = None # (same)
355 368
356 369 _raw_read = httplib.HTTPResponse.read
357 370
358 371 def close(self):
359 372 if self.fp:
360 373 self.fp.close()
361 374 self.fp = None
362 375 if self._handler:
363 376 self._handler._request_closed(self, self._host,
364 377 self._connection)
365 378
366 379 def close_connection(self):
367 380 self._handler._remove_connection(self._host, self._connection, close=1)
368 381 self.close()
369 382
370 383 def info(self):
371 384 return self.headers
372 385
373 386 def geturl(self):
374 387 return self._url
375 388
376 389 def read(self, amt=None):
377 390 # the _rbuf test is only in this first if for speed. It's not
378 391 # logically necessary
379 392 if self._rbuf and not amt is None:
380 393 L = len(self._rbuf)
381 394 if amt > L:
382 395 amt -= L
383 396 else:
384 397 s = self._rbuf[:amt]
385 398 self._rbuf = self._rbuf[amt:]
386 399 return s
387 400
388 401 s = self._rbuf + self._raw_read(amt)
389 402 self._rbuf = ''
390 403 return s
391 404
392 405 # stolen from Python SVN #68532 to fix issue1088
393 406 def _read_chunked(self, amt):
394 407 chunk_left = self.chunk_left
395 408 value = ''
396 409
397 410 # XXX This accumulates chunks by repeated string concatenation,
398 411 # which is not efficient as the number or size of chunks gets big.
399 412 while True:
400 413 if chunk_left is None:
401 414 line = self.fp.readline()
402 415 i = line.find(';')
403 416 if i >= 0:
404 417 line = line[:i] # strip chunk-extensions
405 418 try:
406 419 chunk_left = int(line, 16)
407 420 except ValueError:
408 421 # close the connection as protocol synchronisation is
409 422 # probably lost
410 423 self.close()
411 424 raise httplib.IncompleteRead(value)
412 425 if chunk_left == 0:
413 426 break
414 427 if amt is None:
415 428 value += self._safe_read(chunk_left)
416 429 elif amt < chunk_left:
417 430 value += self._safe_read(amt)
418 431 self.chunk_left = chunk_left - amt
419 432 return value
420 433 elif amt == chunk_left:
421 434 value += self._safe_read(amt)
422 435 self._safe_read(2) # toss the CRLF at the end of the chunk
423 436 self.chunk_left = None
424 437 return value
425 438 else:
426 439 value += self._safe_read(chunk_left)
427 440 amt -= chunk_left
428 441
429 442 # we read the whole chunk, get another
430 443 self._safe_read(2) # toss the CRLF at the end of the chunk
431 444 chunk_left = None
432 445
433 446 # read and discard trailer up to the CRLF terminator
434 447 ### note: we shouldn't have any trailers!
435 448 while True:
436 449 line = self.fp.readline()
437 450 if not line:
438 451 # a vanishingly small number of sites EOF without
439 452 # sending the trailer
440 453 break
441 454 if line == '\r\n':
442 455 break
443 456
444 457 # we read everything; close the "file"
445 458 self.close()
446 459
447 460 return value
448 461
449 462 def readline(self, limit=-1):
450 463 i = self._rbuf.find('\n')
451 464 while i < 0 and not (0 < limit <= len(self._rbuf)):
452 465 new = self._raw_read(self._rbufsize)
453 466 if not new: break
454 467 i = new.find('\n')
455 468 if i >= 0: i = i + len(self._rbuf)
456 469 self._rbuf = self._rbuf + new
457 470 if i < 0: i = len(self._rbuf)
458 471 else: i = i+1
459 472 if 0 <= limit < len(self._rbuf): i = limit
460 473 data, self._rbuf = self._rbuf[:i], self._rbuf[i:]
461 474 return data
462 475
463 476 def readlines(self, sizehint = 0):
464 477 total = 0
465 478 list = []
466 479 while 1:
467 480 line = self.readline()
468 481 if not line: break
469 482 list.append(line)
470 483 total += len(line)
471 484 if sizehint and total >= sizehint:
472 485 break
473 486 return list
474 487
475 488
476 489 class HTTPConnection(httplib.HTTPConnection):
477 490 # use the modified response class
478 491 response_class = HTTPResponse
479 492
480 493 #########################################################################
481 494 ##### TEST FUNCTIONS
482 495 #########################################################################
483 496
484 497 def error_handler(url):
485 498 global HANDLE_ERRORS
486 499 orig = HANDLE_ERRORS
487 500 keepalive_handler = HTTPHandler()
488 501 opener = urllib2.build_opener(keepalive_handler)
489 502 urllib2.install_opener(opener)
490 503 pos = {0: 'off', 1: 'on'}
491 504 for i in (0, 1):
492 505 print " fancy error handling %s (HANDLE_ERRORS = %i)" % (pos[i], i)
493 506 HANDLE_ERRORS = i
494 507 try:
495 508 fo = urllib2.urlopen(url)
496 509 fo.read()
497 510 fo.close()
498 511 try: status, reason = fo.status, fo.reason
499 512 except AttributeError: status, reason = None, None
500 513 except IOError, e:
501 514 print " EXCEPTION: %s" % e
502 515 raise
503 516 else:
504 517 print " status = %s, reason = %s" % (status, reason)
505 518 HANDLE_ERRORS = orig
506 519 hosts = keepalive_handler.open_connections()
507 520 print "open connections:", hosts
508 521 keepalive_handler.close_all()
509 522
510 523 def continuity(url):
511 524 from util import md5
512 525 format = '%25s: %s'
513 526
514 527 # first fetch the file with the normal http handler
515 528 opener = urllib2.build_opener()
516 529 urllib2.install_opener(opener)
517 530 fo = urllib2.urlopen(url)
518 531 foo = fo.read()
519 532 fo.close()
520 533 m = md5.new(foo)
521 534 print format % ('normal urllib', m.hexdigest())
522 535
523 536 # now install the keepalive handler and try again
524 537 opener = urllib2.build_opener(HTTPHandler())
525 538 urllib2.install_opener(opener)
526 539
527 540 fo = urllib2.urlopen(url)
528 541 foo = fo.read()
529 542 fo.close()
530 543 m = md5.new(foo)
531 544 print format % ('keepalive read', m.hexdigest())
532 545
533 546 fo = urllib2.urlopen(url)
534 547 foo = ''
535 548 while 1:
536 549 f = fo.readline()
537 550 if f: foo = foo + f
538 551 else: break
539 552 fo.close()
540 553 m = md5.new(foo)
541 554 print format % ('keepalive readline', m.hexdigest())
542 555
543 556 def comp(N, url):
544 557 print ' making %i connections to:\n %s' % (N, url)
545 558
546 559 sys.stdout.write(' first using the normal urllib handlers')
547 560 # first use normal opener
548 561 opener = urllib2.build_opener()
549 562 urllib2.install_opener(opener)
550 563 t1 = fetch(N, url)
551 564 print ' TIME: %.3f s' % t1
552 565
553 566 sys.stdout.write(' now using the keepalive handler ')
554 567 # now install the keepalive handler and try again
555 568 opener = urllib2.build_opener(HTTPHandler())
556 569 urllib2.install_opener(opener)
557 570 t2 = fetch(N, url)
558 571 print ' TIME: %.3f s' % t2
559 572 print ' improvement factor: %.2f' % (t1/t2, )
560 573
561 574 def fetch(N, url, delay=0):
562 575 import time
563 576 lens = []
564 577 starttime = time.time()
565 578 for i in range(N):
566 579 if delay and i > 0: time.sleep(delay)
567 580 fo = urllib2.urlopen(url)
568 581 foo = fo.read()
569 582 fo.close()
570 583 lens.append(len(foo))
571 584 diff = time.time() - starttime
572 585
573 586 j = 0
574 587 for i in lens[1:]:
575 588 j = j + 1
576 589 if not i == lens[0]:
577 590 print "WARNING: inconsistent length on read %i: %i" % (j, i)
578 591
579 592 return diff
580 593
581 594 def test_timeout(url):
582 595 global DEBUG
583 596 dbbackup = DEBUG
584 597 class FakeLogger:
585 598 def debug(self, msg, *args): print msg % args
586 599 info = warning = error = debug
587 600 DEBUG = FakeLogger()
588 601 print " fetching the file to establish a connection"
589 602 fo = urllib2.urlopen(url)
590 603 data1 = fo.read()
591 604 fo.close()
592 605
593 606 i = 20
594 607 print " waiting %i seconds for the server to close the connection" % i
595 608 while i > 0:
596 609 sys.stdout.write('\r %2i' % i)
597 610 sys.stdout.flush()
598 611 time.sleep(1)
599 612 i -= 1
600 613 sys.stderr.write('\r')
601 614
602 615 print " fetching the file a second time"
603 616 fo = urllib2.urlopen(url)
604 617 data2 = fo.read()
605 618 fo.close()
606 619
607 620 if data1 == data2:
608 621 print ' data are identical'
609 622 else:
610 623 print ' ERROR: DATA DIFFER'
611 624
612 625 DEBUG = dbbackup
613 626
614 627
615 628 def test(url, N=10):
616 629 print "checking error hander (do this on a non-200)"
617 630 try: error_handler(url)
618 631 except IOError:
619 632 print "exiting - exception will prevent further tests"
620 633 sys.exit()
621 634 print
622 635 print "performing continuity test (making sure stuff isn't corrupted)"
623 636 continuity(url)
624 637 print
625 638 print "performing speed comparison"
626 639 comp(N, url)
627 640 print
628 641 print "performing dropped-connection check"
629 642 test_timeout(url)
630 643
631 644 if __name__ == '__main__':
632 645 import time
633 646 import sys
634 647 try:
635 648 N = int(sys.argv[1])
636 649 url = sys.argv[2]
637 650 except:
638 651 print "%s <integer> <url>" % sys.argv[0]
639 652 else:
640 653 test(url, N)
General Comments 0
You need to be logged in to leave comments. Login now