##// END OF EJS Templates
Set default query split to 10. Use logger to log post being parsed
Set default query split to 10. Use logger to log post being parsed

File last commit:

r1532:320de1f1 merge decentral
r1624:bea11db5 default
Show More
downloaders.py
79 lines | 2.2 KiB | text/x-python | PythonLexer
import os
import re
from django.core.files.uploadedfile import SimpleUploadedFile, \
TemporaryUploadedFile
from pytube import YouTube
import requests
from boards.utils import validate_file_size
YOUTUBE_VIDEO_FORMAT = 'webm'
HTTP_RESULT_OK = 200
HEADER_CONTENT_LENGTH = 'content-length'
HEADER_CONTENT_TYPE = 'content-type'
FILE_DOWNLOAD_CHUNK_BYTES = 200000
YOUTUBE_URL = re.compile(r'https?://((www\.)?youtube\.com/watch\?v=|youtu.be/)[-\w]+')
class Downloader:
@staticmethod
def handles(url: str) -> bool:
return False
@staticmethod
def download(url: str):
# Verify content headers
response_head = requests.head(url, verify=False)
content_type = response_head.headers[HEADER_CONTENT_TYPE].split(';')[0]
length_header = response_head.headers.get(HEADER_CONTENT_LENGTH)
if length_header:
length = int(length_header)
validate_file_size(length)
# Get the actual content into memory
response = requests.get(url, verify=False, stream=True)
# Download file, stop if the size exceeds limit
size = 0
# Set a dummy file name that will be replaced
# anyway, just keep the valid extension
filename = 'file.' + content_type.split('/')[1]
file = TemporaryUploadedFile(filename, content_type, 0, None, None)
for chunk in response.iter_content(FILE_DOWNLOAD_CHUNK_BYTES):
size += len(chunk)
validate_file_size(size)
file.write(chunk)
if response.status_code == HTTP_RESULT_OK:
return file
def download(url):
for downloader in Downloader.__subclasses__():
if downloader.handles(url):
return downloader.download(url)
# If nobody of the specific downloaders handles this, use generic
# one
return Downloader.download(url)
class YouTubeDownloader(Downloader):
@staticmethod
def download(url: str):
yt = YouTube()
yt.from_url(url)
videos = yt.filter(YOUTUBE_VIDEO_FORMAT)
if len(videos) > 0:
video = videos[0]
return Downloader.download(video.url)
@staticmethod
def handles(url: str) -> bool:
return YOUTUBE_URL.match(url)