import os import re from django.core.files.uploadedfile import SimpleUploadedFile from pytube import YouTube import requests from boards.utils import validate_file_size YOUTUBE_VIDEO_FORMAT = 'webm' HTTP_RESULT_OK = 200 HEADER_CONTENT_LENGTH = 'content-length' HEADER_CONTENT_TYPE = 'content-type' FILE_DOWNLOAD_CHUNK_BYTES = 100000 YOUTUBE_URL = re.compile(r'https?://www\.youtube\.com/watch\?v=\w+') class Downloader: @staticmethod def handles(url: str) -> bool: return False @staticmethod def download(url: str): # Verify content headers response_head = requests.head(url, verify=False) content_type = response_head.headers[HEADER_CONTENT_TYPE].split(';')[0] length_header = response_head.headers.get(HEADER_CONTENT_LENGTH) if length_header: length = int(length_header) validate_file_size(length) # Get the actual content into memory response = requests.get(url, verify=False, stream=True) # Download file, stop if the size exceeds limit size = 0 content = b'' for chunk in response.iter_content(FILE_DOWNLOAD_CHUNK_BYTES): size += len(chunk) validate_file_size(size) content += chunk if response.status_code == HTTP_RESULT_OK and content: # Set a dummy file name that will be replaced # anyway, just keep the valid extension filename = 'file.' + content_type.split('/')[1] return SimpleUploadedFile(filename, content, content_type) class YouTubeDownloader(Downloader): @staticmethod def download(url: str): yt = YouTube() yt.from_url(url) videos = yt.filter(YOUTUBE_VIDEO_FORMAT) if len(videos) > 0: video = videos[0] filename = '{}.{}'.format(video.filename, video.extension) try: video.download(on_progress=YouTubeDownloader.on_progress) file = open(filename, 'rb') content = file.read() file.close() os.remove(filename) return SimpleUploadedFile(filename, content, video.extension) except Exception as e: if os.path.isfile(filename): os.remove(filename) raise e @staticmethod def handles(url: str) -> bool: return YOUTUBE_URL.match(url) @staticmethod def on_progress(bytes, file_size, start): validate_file_size(file_size)