downloaders.py
83 lines
| 2.5 KiB
| text/x-python
|
PythonLexer
neko259
|
r1328 | import os | ||
import re | ||||
from django.core.files.uploadedfile import SimpleUploadedFile | ||||
from pytube import YouTube | ||||
import requests | ||||
from boards.utils import validate_file_size | ||||
YOUTUBE_VIDEO_FORMAT = 'webm' | ||||
HTTP_RESULT_OK = 200 | ||||
HEADER_CONTENT_LENGTH = 'content-length' | ||||
HEADER_CONTENT_TYPE = 'content-type' | ||||
FILE_DOWNLOAD_CHUNK_BYTES = 100000 | ||||
YOUTUBE_URL = re.compile(r'https?://www\.youtube\.com/watch\?v=\w+') | ||||
class Downloader: | ||||
@staticmethod | ||||
def handles(url: str) -> bool: | ||||
return False | ||||
@staticmethod | ||||
def download(url: str): | ||||
# Verify content headers | ||||
response_head = requests.head(url, verify=False) | ||||
content_type = response_head.headers[HEADER_CONTENT_TYPE].split(';')[0] | ||||
length_header = response_head.headers.get(HEADER_CONTENT_LENGTH) | ||||
if length_header: | ||||
length = int(length_header) | ||||
validate_file_size(length) | ||||
# Get the actual content into memory | ||||
response = requests.get(url, verify=False, stream=True) | ||||
# Download file, stop if the size exceeds limit | ||||
size = 0 | ||||
content = b'' | ||||
for chunk in response.iter_content(FILE_DOWNLOAD_CHUNK_BYTES): | ||||
size += len(chunk) | ||||
validate_file_size(size) | ||||
content += chunk | ||||
if response.status_code == HTTP_RESULT_OK and content: | ||||
# Set a dummy file name that will be replaced | ||||
# anyway, just keep the valid extension | ||||
filename = 'file.' + content_type.split('/')[1] | ||||
return SimpleUploadedFile(filename, content, content_type) | ||||
class YouTubeDownloader(Downloader): | ||||
@staticmethod | ||||
def download(url: str): | ||||
yt = YouTube() | ||||
yt.from_url(url) | ||||
videos = yt.filter(YOUTUBE_VIDEO_FORMAT) | ||||
if len(videos) > 0: | ||||
video = videos[0] | ||||
filename = '{}.{}'.format(video.filename, video.extension) | ||||
try: | ||||
video.download(on_progress=YouTubeDownloader.on_progress) | ||||
file = open(filename, 'rb') | ||||
content = file.read() | ||||
file.close() | ||||
os.remove(filename) | ||||
return SimpleUploadedFile(filename, content, video.extension) | ||||
except Exception as e: | ||||
if os.path.isfile(filename): | ||||
os.remove(filename) | ||||
raise e | ||||
@staticmethod | ||||
def handles(url: str) -> bool: | ||||
return YOUTUBE_URL.match(url) | ||||
@staticmethod | ||||
def on_progress(bytes, file_size, start): | ||||
validate_file_size(file_size) | ||||