##// END OF EJS Templates
Merge pull request #1369 from minrk/EngineError...
Merge pull request #1369 from minrk/EngineError load header with engine id when engine dies in TaskScheduler This ensures that the metadata dict on the Client has the engine_uuid of the engine on which the task failed. Previously, this entry would remain empty. It is identical to code elsewhere (Hub, Client) for constructing the dummy reply when engines die.

File last commit:

r4910:0dc49390
r6098:0291d619 merge
Show More
fetchparse.py
97 lines | 2.8 KiB | text/x-python | PythonLexer
MinRK
update API after sagedays29...
r3664 """
An exceptionally lousy site spider
Ken Kinder <ken@kenkinder.com>
MinRK
remove kernel examples already ported to newparallel
r3675 Updated for newparallel by Min Ragan-Kelley <benjaminrk@gmail.com>
Bernardo B. Marques
remove all trailling spaces
r4872 This module gives an example of how the task interface to the
MinRK
update API after sagedays29...
r3664 IPython controller works. Before running this script start the IPython controller
and some engines using something like::
MinRK
remove kernel examples already ported to newparallel
r3675 ipclusterz start -n 4
MinRK
update API after sagedays29...
r3664 """
MinRK
remove kernel examples already ported to newparallel
r3675 import sys
from IPython.parallel import Client, error
MinRK
update API after sagedays29...
r3664 import time
MinRK
remove kernel examples already ported to newparallel
r3675 import BeautifulSoup # this isn't necessary, but it helps throw the dependency error earlier
MinRK
update API after sagedays29...
r3664
def fetchAndParse(url, data=None):
MinRK
remove kernel examples already ported to newparallel
r3675 import urllib2
import urlparse
import BeautifulSoup
MinRK
update API after sagedays29...
r3664 links = []
try:
page = urllib2.urlopen(url, data=data)
except Exception:
return links
else:
if page.headers.type == 'text/html':
MinRK
remove kernel examples already ported to newparallel
r3675 doc = BeautifulSoup.BeautifulSoup(page.read())
for node in doc.findAll('a'):
href = node.get('href', None)
if href:
links.append(urlparse.urljoin(url, href))
MinRK
update API after sagedays29...
r3664 return links
class DistributedSpider(object):
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 # Time to wait between polling for task results.
pollingDelay = 0.5
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def __init__(self, site):
MinRK
remove kernel examples already ported to newparallel
r3675 self.client = Client()
self.view = self.client.load_balanced_view()
self.mux = self.client[:]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 self.allLinks = []
self.linksWorking = {}
self.linksDone = {}
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 self.site = site
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def visitLink(self, url):
if url not in self.allLinks:
self.allLinks.append(url)
if url.startswith(self.site):
print ' ', url
MinRK
remove kernel examples already ported to newparallel
r3675 self.linksWorking[url] = self.view.apply(fetchAndParse, url)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
remove kernel examples already ported to newparallel
r3675 def onVisitDone(self, links, url):
MinRK
update API after sagedays29...
r3664 print url, ':'
self.linksDone[url] = None
del self.linksWorking[url]
MinRK
remove kernel examples already ported to newparallel
r3675 for link in links:
self.visitLink(link)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def run(self):
self.visitLink(self.site)
while self.linksWorking:
print len(self.linksWorking), 'pending...'
self.synchronize()
time.sleep(self.pollingDelay)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def synchronize(self):
MinRK
remove kernel examples already ported to newparallel
r3675 for url, ar in self.linksWorking.items():
MinRK
update API after sagedays29...
r3664 # Calling get_task_result with block=False will return None if the
# task is not done yet. This provides a simple way of polling.
MinRK
remove kernel examples already ported to newparallel
r3675 try:
links = ar.get(0)
except error.TimeoutError:
continue
except Exception as e:
self.linksDone[url] = None
del self.linksWorking[url]
print url, ':', e.traceback
else:
self.onVisitDone(links, url)
MinRK
update API after sagedays29...
r3664
def main():
MinRK
remove kernel examples already ported to newparallel
r3675 if len(sys.argv) > 1:
site = sys.argv[1]
else:
site = raw_input('Enter site to crawl: ')
distributedSpider = DistributedSpider(site)
MinRK
update API after sagedays29...
r3664 distributedSpider.run()
if __name__ == '__main__':
main()