downloaders.py
101 lines
| 2.7 KiB
| text/x-python
|
PythonLexer
neko259
|
r1328 | import re | ||
neko259
|
r1765 | import requests | ||
from django.core.files.uploadedfile import TemporaryUploadedFile | ||||
neko259
|
r1328 | from pytube import YouTube | ||
from boards.utils import validate_file_size | ||||
YOUTUBE_VIDEO_FORMAT = 'webm' | ||||
HTTP_RESULT_OK = 200 | ||||
HEADER_CONTENT_LENGTH = 'content-length' | ||||
HEADER_CONTENT_TYPE = 'content-type' | ||||
neko259
|
r1394 | FILE_DOWNLOAD_CHUNK_BYTES = 200000 | ||
neko259
|
r1328 | |||
neko259
|
r1765 | REGEX_YOUTUBE_URL = re.compile(r'https?://((www\.)?youtube\.com/watch\?v=|youtu.be/)[-\w]+') | ||
REGEX_MAGNET = re.compile(r'magnet:\?xt=urn:(btih:)?[a-z0-9]{20,50}.*') | ||||
neko259
|
r1328 | |||
neko259
|
r1683 | TYPE_URL_ONLY = ( | ||
'application/xhtml+xml', | ||||
'text/html', | ||||
) | ||||
neko259
|
r1328 | |||
class Downloader: | ||||
@staticmethod | ||||
def handles(url: str) -> bool: | ||||
neko259
|
r1811 | return True | ||
neko259
|
r1328 | |||
@staticmethod | ||||
neko259
|
r1833 | def download(url: str, validate): | ||
neko259
|
r1328 | # Verify content headers | ||
response_head = requests.head(url, verify=False) | ||||
content_type = response_head.headers[HEADER_CONTENT_TYPE].split(';')[0] | ||||
neko259
|
r1833 | if validate and content_type in TYPE_URL_ONLY: | ||
neko259
|
r1809 | return None | ||
neko259
|
r1328 | length_header = response_head.headers.get(HEADER_CONTENT_LENGTH) | ||
neko259
|
r1833 | if validate and length_header: | ||
neko259
|
r1328 | length = int(length_header) | ||
validate_file_size(length) | ||||
# Get the actual content into memory | ||||
response = requests.get(url, verify=False, stream=True) | ||||
neko259
|
r1801 | if response.status_code == HTTP_RESULT_OK: | ||
# Download file, stop if the size exceeds limit | ||||
size = 0 | ||||
neko259
|
r1394 | |||
neko259
|
r1801 | # Set a dummy file name that will be replaced | ||
# anyway, just keep the valid extension | ||||
filename = 'file.' + content_type.split('/')[1] | ||||
neko259
|
r1328 | |||
neko259
|
r1801 | file = TemporaryUploadedFile(filename, content_type, 0, None, None) | ||
for chunk in response.iter_content(FILE_DOWNLOAD_CHUNK_BYTES): | ||||
size += len(chunk) | ||||
validate_file_size(size) | ||||
file.write(chunk) | ||||
neko259
|
r1394 | return file | ||
neko259
|
r1328 | |||
class YouTubeDownloader(Downloader): | ||||
@staticmethod | ||||
neko259
|
r1833 | def download(url: str, validate): | ||
neko259
|
r1328 | yt = YouTube() | ||
yt.from_url(url) | ||||
videos = yt.filter(YOUTUBE_VIDEO_FORMAT) | ||||
if len(videos) > 0: | ||||
video = videos[0] | ||||
neko259
|
r1334 | return Downloader.download(video.url) | ||
neko259
|
r1328 | |||
@staticmethod | ||||
def handles(url: str) -> bool: | ||||
neko259
|
r1765 | return REGEX_YOUTUBE_URL.match(url) is not None | ||
neko259
|
r1500 | |||
neko259
|
r1683 | |||
class NothingDownloader(Downloader): | ||||
@staticmethod | ||||
def handles(url: str) -> bool: | ||||
neko259
|
r1810 | return REGEX_MAGNET.match(url) | ||
neko259
|
r1683 | |||
@staticmethod | ||||
neko259
|
r1833 | def download(url: str, validate): | ||
neko259
|
r1683 | return None | ||
neko259
|
r1811 | |||
DOWNLOADERS = ( | ||||
YouTubeDownloader, | ||||
NothingDownloader, | ||||
Downloader, | ||||
) | ||||
neko259
|
r1833 | def download(url, validate=True): | ||
neko259
|
r1811 | for downloader in DOWNLOADERS: | ||
if downloader.handles(url): | ||||
neko259
|
r1833 | return downloader.download(url, validate=validate) | ||
neko259
|
r1811 | raise Exception('No downloader supports this URL.') | ||