##// END OF EJS Templates
Do not rely on the md5 hash of the file, compare the file contents when searching for duplicate
Do not rely on the md5 hash of the file, compare the file contents when searching for duplicate

File last commit:

r1811:c2aa90c2 default
r1824:d33ed39f default
Show More
downloaders.py
101 lines | 2.6 KiB | text/x-python | PythonLexer
import re
import requests
from django.core.files.uploadedfile import TemporaryUploadedFile
from pytube import YouTube
from boards.utils import validate_file_size
YOUTUBE_VIDEO_FORMAT = 'webm'
HTTP_RESULT_OK = 200
HEADER_CONTENT_LENGTH = 'content-length'
HEADER_CONTENT_TYPE = 'content-type'
FILE_DOWNLOAD_CHUNK_BYTES = 200000
REGEX_YOUTUBE_URL = re.compile(r'https?://((www\.)?youtube\.com/watch\?v=|youtu.be/)[-\w]+')
REGEX_MAGNET = re.compile(r'magnet:\?xt=urn:(btih:)?[a-z0-9]{20,50}.*')
TYPE_URL_ONLY = (
'application/xhtml+xml',
'text/html',
)
class Downloader:
@staticmethod
def handles(url: str) -> bool:
return True
@staticmethod
def download(url: str):
# Verify content headers
response_head = requests.head(url, verify=False)
content_type = response_head.headers[HEADER_CONTENT_TYPE].split(';')[0]
if content_type in TYPE_URL_ONLY:
return None
length_header = response_head.headers.get(HEADER_CONTENT_LENGTH)
if length_header:
length = int(length_header)
validate_file_size(length)
# Get the actual content into memory
response = requests.get(url, verify=False, stream=True)
if response.status_code == HTTP_RESULT_OK:
# Download file, stop if the size exceeds limit
size = 0
# Set a dummy file name that will be replaced
# anyway, just keep the valid extension
filename = 'file.' + content_type.split('/')[1]
file = TemporaryUploadedFile(filename, content_type, 0, None, None)
for chunk in response.iter_content(FILE_DOWNLOAD_CHUNK_BYTES):
size += len(chunk)
validate_file_size(size)
file.write(chunk)
return file
class YouTubeDownloader(Downloader):
@staticmethod
def download(url: str):
yt = YouTube()
yt.from_url(url)
videos = yt.filter(YOUTUBE_VIDEO_FORMAT)
if len(videos) > 0:
video = videos[0]
return Downloader.download(video.url)
@staticmethod
def handles(url: str) -> bool:
return REGEX_YOUTUBE_URL.match(url) is not None
class NothingDownloader(Downloader):
@staticmethod
def handles(url: str) -> bool:
return REGEX_MAGNET.match(url)
@staticmethod
def download(url: str):
return None
DOWNLOADERS = (
YouTubeDownloader,
NothingDownloader,
Downloader,
)
def download(url):
for downloader in DOWNLOADERS:
if downloader.handles(url):
return downloader.download(url)
raise Exception('No downloader supports this URL.')