import os import re from django.core.files.uploadedfile import SimpleUploadedFile, \ TemporaryUploadedFile from pytube import YouTube import requests from boards.utils import validate_file_size YOUTUBE_VIDEO_FORMAT = 'webm' HTTP_RESULT_OK = 200 HEADER_CONTENT_LENGTH = 'content-length' HEADER_CONTENT_TYPE = 'content-type' FILE_DOWNLOAD_CHUNK_BYTES = 200000 YOUTUBE_URL = re.compile(r'https?://((www\.)?youtube\.com/watch\?v=|youtu.be/)[-\w]+') class Downloader: @staticmethod def handles(url: str) -> bool: return False @staticmethod def download(url: str): # Verify content headers response_head = requests.head(url, verify=False) content_type = response_head.headers[HEADER_CONTENT_TYPE].split(';')[0] length_header = response_head.headers.get(HEADER_CONTENT_LENGTH) if length_header: length = int(length_header) validate_file_size(length) # Get the actual content into memory response = requests.get(url, verify=False, stream=True) # Download file, stop if the size exceeds limit size = 0 # Set a dummy file name that will be replaced # anyway, just keep the valid extension filename = 'file.' + content_type.split('/')[1] file = TemporaryUploadedFile(filename, content_type, 0, None, None) for chunk in response.iter_content(FILE_DOWNLOAD_CHUNK_BYTES): size += len(chunk) validate_file_size(size) file.write(chunk) if response.status_code == HTTP_RESULT_OK: return file def download(url): for downloader in Downloader.__subclasses__(): if downloader.handles(url): return downloader.download(url) # If nobody of the specific downloaders handles this, use generic # one return Downloader.download(url) class YouTubeDownloader(Downloader): @staticmethod def download(url: str): yt = YouTube() yt.from_url(url) videos = yt.filter(YOUTUBE_VIDEO_FORMAT) if len(videos) > 0: video = videos[0] return Downloader.download(video.url) @staticmethod def handles(url: str) -> bool: return YOUTUBE_URL.match(url)