downloaders.py
95 lines
| 2.6 KiB
| text/x-python
|
PythonLexer
neko259
|
r1328 | import os | ||
import re | ||||
neko259
|
r1394 | from django.core.files.uploadedfile import SimpleUploadedFile, \ | ||
TemporaryUploadedFile | ||||
neko259
|
r1328 | from pytube import YouTube | ||
import requests | ||||
from boards.utils import validate_file_size | ||||
YOUTUBE_VIDEO_FORMAT = 'webm' | ||||
HTTP_RESULT_OK = 200 | ||||
HEADER_CONTENT_LENGTH = 'content-length' | ||||
HEADER_CONTENT_TYPE = 'content-type' | ||||
neko259
|
r1394 | FILE_DOWNLOAD_CHUNK_BYTES = 200000 | ||
neko259
|
r1328 | |||
neko259
|
r1527 | YOUTUBE_URL = re.compile(r'https?://((www\.)?youtube\.com/watch\?v=|youtu.be/)[-\w]+') | ||
neko259
|
r1328 | |||
neko259
|
r1683 | TYPE_URL_ONLY = ( | ||
'application/xhtml+xml', | ||||
'text/html', | ||||
) | ||||
neko259
|
r1328 | |||
class Downloader: | ||||
@staticmethod | ||||
def handles(url: str) -> bool: | ||||
return False | ||||
@staticmethod | ||||
def download(url: str): | ||||
# Verify content headers | ||||
response_head = requests.head(url, verify=False) | ||||
content_type = response_head.headers[HEADER_CONTENT_TYPE].split(';')[0] | ||||
length_header = response_head.headers.get(HEADER_CONTENT_LENGTH) | ||||
if length_header: | ||||
length = int(length_header) | ||||
validate_file_size(length) | ||||
# Get the actual content into memory | ||||
response = requests.get(url, verify=False, stream=True) | ||||
# Download file, stop if the size exceeds limit | ||||
size = 0 | ||||
neko259
|
r1394 | |||
# Set a dummy file name that will be replaced | ||||
# anyway, just keep the valid extension | ||||
filename = 'file.' + content_type.split('/')[1] | ||||
file = TemporaryUploadedFile(filename, content_type, 0, None, None) | ||||
neko259
|
r1328 | for chunk in response.iter_content(FILE_DOWNLOAD_CHUNK_BYTES): | ||
size += len(chunk) | ||||
validate_file_size(size) | ||||
neko259
|
r1394 | file.write(chunk) | ||
neko259
|
r1328 | |||
neko259
|
r1394 | if response.status_code == HTTP_RESULT_OK: | ||
return file | ||||
neko259
|
r1328 | |||
neko259
|
r1511 | def download(url): | ||
for downloader in Downloader.__subclasses__(): | ||||
if downloader.handles(url): | ||||
return downloader.download(url) | ||||
# If nobody of the specific downloaders handles this, use generic | ||||
# one | ||||
return Downloader.download(url) | ||||
neko259
|
r1328 | class YouTubeDownloader(Downloader): | ||
@staticmethod | ||||
def download(url: str): | ||||
yt = YouTube() | ||||
yt.from_url(url) | ||||
videos = yt.filter(YOUTUBE_VIDEO_FORMAT) | ||||
if len(videos) > 0: | ||||
video = videos[0] | ||||
neko259
|
r1334 | return Downloader.download(video.url) | ||
neko259
|
r1328 | |||
@staticmethod | ||||
def handles(url: str) -> bool: | ||||
return YOUTUBE_URL.match(url) | ||||
neko259
|
r1500 | |||
neko259
|
r1683 | |||
class NothingDownloader(Downloader): | ||||
@staticmethod | ||||
def handles(url: str) -> bool: | ||||
response_head = requests.head(url, verify=False) | ||||
content_type = response_head.headers[HEADER_CONTENT_TYPE].split(';')[0] | ||||
neko259
|
r1685 | return content_type in TYPE_URL_ONLY and not YOUTUBE_URL.match(url) | ||
neko259
|
r1683 | |||
@staticmethod | ||||
def download(url: str): | ||||
return None | ||||