##// END OF EJS Templates
Force validate file size on sync
Bohdan Horbeshko -
r2145:c3a97d51 lite
parent child Browse files
Show More
@@ -1,101 +1,101 b''
1 1 import re
2 2
3 3 import requests
4 4 from django.core.files.uploadedfile import TemporaryUploadedFile
5 5 from pytube import YouTube
6 6
7 7 from boards.utils import validate_file_size
8 8
9 9 YOUTUBE_VIDEO_FORMAT = 'webm'
10 10
11 11 HTTP_RESULT_OK = 200
12 12
13 13 HEADER_CONTENT_LENGTH = 'content-length'
14 14 HEADER_CONTENT_TYPE = 'content-type'
15 15
16 16 FILE_DOWNLOAD_CHUNK_BYTES = 200000
17 17
18 18 REGEX_YOUTUBE_URL = re.compile(r'https?://((www\.)?youtube\.com/watch\?v=|youtu.be/)[-\w]+')
19 19 REGEX_MAGNET = re.compile(r'magnet:\?xt=urn:(btih:)?[a-z0-9]{20,50}.*')
20 20
21 21 TYPE_URL_ONLY = (
22 22 'application/xhtml+xml',
23 23 'text/html',
24 24 )
25 25
26 26
27 27 class Downloader:
28 28 @staticmethod
29 29 def handles(url: str) -> bool:
30 30 return True
31 31
32 32 @staticmethod
33 33 def download(url: str, validate):
34 34 # Verify content headers
35 35 response_head = requests.head(url, verify=False)
36 36 content_type = response_head.headers[HEADER_CONTENT_TYPE].split(';')[0]
37 37 if validate and content_type in TYPE_URL_ONLY:
38 38 return None
39 39
40 40 length_header = response_head.headers.get(HEADER_CONTENT_LENGTH)
41 if validate and length_header:
41 if length_header:
42 42 length = int(length_header)
43 43 validate_file_size(length)
44 44 # Get the actual content into memory
45 45 response = requests.get(url, verify=False, stream=True)
46 46
47 47 if response.status_code == HTTP_RESULT_OK:
48 48 # Download file, stop if the size exceeds limit
49 49 size = 0
50 50
51 51 # Set a dummy file name that will be replaced
52 52 # anyway, just keep the valid extension
53 53 filename = 'file.' + content_type.split('/')[1]
54 54
55 55 file = TemporaryUploadedFile(filename, content_type, 0, None, None)
56 56 for chunk in response.iter_content(FILE_DOWNLOAD_CHUNK_BYTES):
57 57 size += len(chunk)
58 58 validate_file_size(size)
59 59 file.write(chunk)
60 60
61 61 return file
62 62
63 63
64 64 class YouTubeDownloader(Downloader):
65 65 @staticmethod
66 66 def download(url: str, validate):
67 67 yt = YouTube()
68 68 yt.from_url(url)
69 69 videos = yt.filter(YOUTUBE_VIDEO_FORMAT)
70 70 if len(videos) > 0:
71 71 video = videos[0]
72 72 return Downloader.download(video.url)
73 73
74 74 @staticmethod
75 75 def handles(url: str) -> bool:
76 76 return REGEX_YOUTUBE_URL.match(url) is not None
77 77
78 78
79 79 class NothingDownloader(Downloader):
80 80 @staticmethod
81 81 def handles(url: str) -> bool:
82 82 return REGEX_MAGNET.match(url)
83 83
84 84 @staticmethod
85 85 def download(url: str, validate):
86 86 return None
87 87
88 88
89 89 DOWNLOADERS = (
90 90 YouTubeDownloader,
91 91 NothingDownloader,
92 92 Downloader,
93 93 )
94 94
95 95
96 96 def download(url, validate=True):
97 97 for downloader in DOWNLOADERS:
98 98 if downloader.handles(url):
99 99 return downloader.download(url, validate=validate)
100 100 raise Exception('No downloader supports this URL.')
101 101
General Comments 0
You need to be logged in to leave comments. Login now