##// END OF EJS Templates
client profile defaults to running IPython profile...
client profile defaults to running IPython profile also fix ArgumentError when Client is given positional arguments

File last commit:

r3666:a6a0636a
r4071:9cb5aa8e
Show More
fetchparse.py
97 lines | 2.9 KiB | text/x-python | PythonLexer
MinRK
update API after sagedays29...
r3664 """
An exceptionally lousy site spider
Ken Kinder <ken@kenkinder.com>
MinRK
remove kernel examples already ported to newparallel
r3675 Updated for newparallel by Min Ragan-Kelley <benjaminrk@gmail.com>
This module gives an example of how the task interface to the
MinRK
update API after sagedays29...
r3664 IPython controller works. Before running this script start the IPython controller
and some engines using something like::
MinRK
remove kernel examples already ported to newparallel
r3675 ipclusterz start -n 4
MinRK
update API after sagedays29...
r3664 """
MinRK
remove kernel examples already ported to newparallel
r3675 import sys
from IPython.parallel import Client, error
MinRK
update API after sagedays29...
r3664 import time
MinRK
remove kernel examples already ported to newparallel
r3675 import BeautifulSoup # this isn't necessary, but it helps throw the dependency error earlier
MinRK
update API after sagedays29...
r3664
def fetchAndParse(url, data=None):
MinRK
remove kernel examples already ported to newparallel
r3675 import urllib2
import urlparse
import BeautifulSoup
MinRK
update API after sagedays29...
r3664 links = []
try:
page = urllib2.urlopen(url, data=data)
except Exception:
return links
else:
if page.headers.type == 'text/html':
MinRK
remove kernel examples already ported to newparallel
r3675 doc = BeautifulSoup.BeautifulSoup(page.read())
for node in doc.findAll('a'):
href = node.get('href', None)
if href:
links.append(urlparse.urljoin(url, href))
MinRK
update API after sagedays29...
r3664 return links
class DistributedSpider(object):
# Time to wait between polling for task results.
pollingDelay = 0.5
def __init__(self, site):
MinRK
remove kernel examples already ported to newparallel
r3675 self.client = Client()
self.view = self.client.load_balanced_view()
self.mux = self.client[:]
MinRK
update API after sagedays29...
r3664
self.allLinks = []
self.linksWorking = {}
self.linksDone = {}
self.site = site
def visitLink(self, url):
if url not in self.allLinks:
self.allLinks.append(url)
if url.startswith(self.site):
print ' ', url
MinRK
remove kernel examples already ported to newparallel
r3675 self.linksWorking[url] = self.view.apply(fetchAndParse, url)
MinRK
update API after sagedays29...
r3664
MinRK
remove kernel examples already ported to newparallel
r3675 def onVisitDone(self, links, url):
MinRK
update API after sagedays29...
r3664 print url, ':'
self.linksDone[url] = None
del self.linksWorking[url]
MinRK
remove kernel examples already ported to newparallel
r3675 for link in links:
self.visitLink(link)
MinRK
update API after sagedays29...
r3664
def run(self):
self.visitLink(self.site)
while self.linksWorking:
print len(self.linksWorking), 'pending...'
self.synchronize()
time.sleep(self.pollingDelay)
def synchronize(self):
MinRK
remove kernel examples already ported to newparallel
r3675 for url, ar in self.linksWorking.items():
MinRK
update API after sagedays29...
r3664 # Calling get_task_result with block=False will return None if the
# task is not done yet. This provides a simple way of polling.
MinRK
remove kernel examples already ported to newparallel
r3675 try:
links = ar.get(0)
except error.TimeoutError:
continue
except Exception as e:
self.linksDone[url] = None
del self.linksWorking[url]
print url, ':', e.traceback
else:
self.onVisitDone(links, url)
MinRK
update API after sagedays29...
r3664
def main():
MinRK
remove kernel examples already ported to newparallel
r3675 if len(sys.argv) > 1:
site = sys.argv[1]
else:
site = raw_input('Enter site to crawl: ')
distributedSpider = DistributedSpider(site)
MinRK
update API after sagedays29...
r3664 distributedSpider.run()
if __name__ == '__main__':
main()