##// END OF EJS Templates
Search for image for every domain level starting from the lowest one. Cache this into memcached
Search for image for every domain level starting from the lowest one. Cache this into memcached

File last commit:

r1765:7a6a61e1 default
r1772:1342675d default
Show More
downloaders.py
97 lines | 2.8 KiB | text/x-python | PythonLexer
import re
import requests
from django.core.files.uploadedfile import TemporaryUploadedFile
from pytube import YouTube
from boards.utils import validate_file_size
YOUTUBE_VIDEO_FORMAT = 'webm'
HTTP_RESULT_OK = 200
HEADER_CONTENT_LENGTH = 'content-length'
HEADER_CONTENT_TYPE = 'content-type'
FILE_DOWNLOAD_CHUNK_BYTES = 200000
REGEX_YOUTUBE_URL = re.compile(r'https?://((www\.)?youtube\.com/watch\?v=|youtu.be/)[-\w]+')
REGEX_MAGNET = re.compile(r'magnet:\?xt=urn:(btih:)?[a-z0-9]{20,50}.*')
TYPE_URL_ONLY = (
'application/xhtml+xml',
'text/html',
)
class Downloader:
@staticmethod
def handles(url: str) -> bool:
return False
@staticmethod
def download(url: str):
# Verify content headers
response_head = requests.head(url, verify=False)
content_type = response_head.headers[HEADER_CONTENT_TYPE].split(';')[0]
length_header = response_head.headers.get(HEADER_CONTENT_LENGTH)
if length_header:
length = int(length_header)
validate_file_size(length)
# Get the actual content into memory
response = requests.get(url, verify=False, stream=True)
# Download file, stop if the size exceeds limit
size = 0
# Set a dummy file name that will be replaced
# anyway, just keep the valid extension
filename = 'file.' + content_type.split('/')[1]
file = TemporaryUploadedFile(filename, content_type, 0, None, None)
for chunk in response.iter_content(FILE_DOWNLOAD_CHUNK_BYTES):
size += len(chunk)
validate_file_size(size)
file.write(chunk)
if response.status_code == HTTP_RESULT_OK:
return file
def download(url):
for downloader in Downloader.__subclasses__():
if downloader.handles(url):
return downloader.download(url)
# If nobody of the specific downloaders handles this, use generic
# one
return Downloader.download(url)
class YouTubeDownloader(Downloader):
@staticmethod
def download(url: str):
yt = YouTube()
yt.from_url(url)
videos = yt.filter(YOUTUBE_VIDEO_FORMAT)
if len(videos) > 0:
video = videos[0]
return Downloader.download(video.url)
@staticmethod
def handles(url: str) -> bool:
return REGEX_YOUTUBE_URL.match(url) is not None
class NothingDownloader(Downloader):
@staticmethod
def handles(url: str) -> bool:
if REGEX_MAGNET.match(url) or REGEX_YOUTUBE_URL.match(url):
return True
response_head = requests.head(url, verify=False)
content_type = response_head.headers[HEADER_CONTENT_TYPE].split(';')[0]
return content_type in TYPE_URL_ONLY
@staticmethod
def download(url: str):
return None