import re import requests from django.core.files.uploadedfile import TemporaryUploadedFile from pytube import YouTube from boards.utils import validate_file_size YOUTUBE_VIDEO_FORMAT = 'webm' HTTP_RESULT_OK = 200 HEADER_CONTENT_LENGTH = 'content-length' HEADER_CONTENT_TYPE = 'content-type' FILE_DOWNLOAD_CHUNK_BYTES = 200000 REGEX_YOUTUBE_URL = re.compile(r'https?://((www\.)?youtube\.com/watch\?v=|youtu.be/)[-\w]+') REGEX_MAGNET = re.compile(r'magnet:\?xt=urn:(btih:)?[a-z0-9]{20,50}.*') TYPE_URL_ONLY = ( 'application/xhtml+xml', 'text/html', ) class Downloader: @staticmethod def handles(url: str) -> bool: return True @staticmethod def download(url: str, validate): # Verify content headers response_head = requests.head(url, verify=False) content_type = response_head.headers[HEADER_CONTENT_TYPE].split(';')[0] if validate and content_type in TYPE_URL_ONLY: return None length_header = response_head.headers.get(HEADER_CONTENT_LENGTH) if validate and length_header: length = int(length_header) validate_file_size(length) # Get the actual content into memory response = requests.get(url, verify=False, stream=True) if response.status_code == HTTP_RESULT_OK: # Download file, stop if the size exceeds limit size = 0 # Set a dummy file name that will be replaced # anyway, just keep the valid extension filename = 'file.' + content_type.split('/')[1] file = TemporaryUploadedFile(filename, content_type, 0, None, None) for chunk in response.iter_content(FILE_DOWNLOAD_CHUNK_BYTES): size += len(chunk) validate_file_size(size) file.write(chunk) return file class YouTubeDownloader(Downloader): @staticmethod def download(url: str, validate): yt = YouTube() yt.from_url(url) videos = yt.filter(YOUTUBE_VIDEO_FORMAT) if len(videos) > 0: video = videos[0] return Downloader.download(video.url, validate) @staticmethod def handles(url: str) -> bool: return REGEX_YOUTUBE_URL.match(url) is not None class NothingDownloader(Downloader): @staticmethod def handles(url: str) -> bool: return REGEX_MAGNET.match(url) @staticmethod def download(url: str, validate): return None DOWNLOADERS = ( YouTubeDownloader, NothingDownloader, Downloader, ) def download(url, validate=True): for downloader in DOWNLOADERS: if downloader.handles(url): return downloader.download(url, validate=validate) raise Exception('No downloader supports this URL.')