##// END OF EJS Templates
If thread is specified in the post template, do not load it again
If thread is specified in the post template, do not load it again

File last commit:

r1532:320de1f1 merge decentral
r1670:c9facaf1 default
Show More
downloaders.py
79 lines | 2.2 KiB | text/x-python | PythonLexer
import os
import re
from django.core.files.uploadedfile import SimpleUploadedFile, \
TemporaryUploadedFile
from pytube import YouTube
import requests
from boards.utils import validate_file_size
YOUTUBE_VIDEO_FORMAT = 'webm'
HTTP_RESULT_OK = 200
HEADER_CONTENT_LENGTH = 'content-length'
HEADER_CONTENT_TYPE = 'content-type'
FILE_DOWNLOAD_CHUNK_BYTES = 200000
YOUTUBE_URL = re.compile(r'https?://((www\.)?youtube\.com/watch\?v=|youtu.be/)[-\w]+')
class Downloader:
@staticmethod
def handles(url: str) -> bool:
return False
@staticmethod
def download(url: str):
# Verify content headers
response_head = requests.head(url, verify=False)
content_type = response_head.headers[HEADER_CONTENT_TYPE].split(';')[0]
length_header = response_head.headers.get(HEADER_CONTENT_LENGTH)
if length_header:
length = int(length_header)
validate_file_size(length)
# Get the actual content into memory
response = requests.get(url, verify=False, stream=True)
# Download file, stop if the size exceeds limit
size = 0
# Set a dummy file name that will be replaced
# anyway, just keep the valid extension
filename = 'file.' + content_type.split('/')[1]
file = TemporaryUploadedFile(filename, content_type, 0, None, None)
for chunk in response.iter_content(FILE_DOWNLOAD_CHUNK_BYTES):
size += len(chunk)
validate_file_size(size)
file.write(chunk)
if response.status_code == HTTP_RESULT_OK:
return file
def download(url):
for downloader in Downloader.__subclasses__():
if downloader.handles(url):
return downloader.download(url)
# If nobody of the specific downloaders handles this, use generic
# one
return Downloader.download(url)
class YouTubeDownloader(Downloader):
@staticmethod
def download(url: str):
yt = YouTube()
yt.from_url(url)
videos = yt.filter(YOUTUBE_VIDEO_FORMAT)
if len(videos) > 0:
video = videos[0]
return Downloader.download(video.url)
@staticmethod
def handles(url: str) -> bool:
return YOUTUBE_URL.match(url)