-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathupload_media.py
446 lines (354 loc) · 16.2 KB
/
upload_media.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
from __future__ import annotations
import json
import os
import shutil
from glob import glob
from pathlib import Path
import requests
from loguru import logger
from tqdm import tqdm
from szurubooru_toolkit import config
from szurubooru_toolkit import szuru
from szurubooru_toolkit.scripts import auto_tagger
from szurubooru_toolkit.scripts import tag_posts
from szurubooru_toolkit.szurubooru import Post
from szurubooru_toolkit.szurubooru import Szurubooru
from szurubooru_toolkit.utils import get_md5sum
from szurubooru_toolkit.utils import shrink_img
def get_files(upload_dir: str) -> list:
"""
Reads recursively images/videos from upload_dir.
This function searches for files with the extensions 'jpg', 'jpeg', 'png', 'mp4', 'webm', 'gif', 'swf', and 'webp'
in the specified directory and its subdirectories.
Args:
upload_dir (str): The directory on the local system which contains the images/videos you want to upload.
Returns:
list: A list which contains the full path of each found images/videos (includes subdirectories).
"""
allowed_extensions = ['jpg', 'jpeg', 'png', 'mp4', 'webm', 'gif', 'swf', 'webp']
files_raw = list(
filter(None, [glob(upload_dir + '/**/*.' + extension, recursive=True) for extension in allowed_extensions]),
)
files = [y for x in files_raw for y in x]
return files
def get_media_token(szuru: Szurubooru, media: bytes) -> str:
"""
Upload the media file to the temporary upload endpoint.
This function uploads a media file to the temporary upload endpoint of szurubooru and returns the token received
from the response. This token can be used to access the temporary file.
Args:
szuru (Szurubooru): A szurubooru object.
media (bytes): The media file to upload as bytes.
Returns:
str: A token from szurubooru.
Raises:
Exception: If the response contains a 'description' field, an exception is raised with the description as the
error message.
"""
post_url = szuru.szuru_api_url + '/uploads'
try:
response = requests.post(post_url, files={'content': media}, headers=szuru.headers)
if 'description' in response.json():
raise Exception(response.json()['description'])
else:
token = response.json()['token']
return token
except Exception as e:
logger.critical(f'An error occured while getting the image token: {e}')
def check_similarity(szuru: Szurubooru, image_token: str) -> tuple | None:
"""
Do a reverse image search with the temporary uploaded image.
This function uses the temporary image token to perform a reverse image search on szurubooru. It returns a tuple
containing the metadata of the exact match post and a list of similar posts, if any.
Args:
szuru (Szurubooru): A szurubooru object.
image_token (str): An image token from szurubooru.
Returns:
tuple: A tuple containing the metadata of the exact match post and a list of similar posts, if any.
None: If no exact match or similar posts are found.
Raises:
Exception: If the response contains a 'description' field, an exception is raised with the description as the
error message.
"""
post_url = szuru.szuru_api_url + '/posts/reverse-search'
metadata = json.dumps({'contentToken': image_token})
try:
response = requests.post(post_url, headers=szuru.headers, data=metadata)
if 'description' in response.json():
raise Exception(response.json()['description'])
else:
exact_post = response.json()['exactPost']
similar_posts = response.json()['similarPosts']
errors = False
return exact_post, similar_posts, errors
except Exception as e:
logger.warning(f'An error occured during the similarity check: {e}. Skipping post...')
errors = True
return False, [], errors
def upload_file(szuru: Szurubooru, post: Post) -> None:
"""
Uploads the temporary image to szurubooru, making it visible to all users.
This function uploads a temporary image to szurubooru, making it accessible and visible to all users. It also sets
the tags, safety, source, relations, and contentToken of the post. If similar posts were found during the similarity
check, they are added as relations. The file is deleted after the upload has been completed.
Args:
szuru (Szurubooru): A szurubooru object.
post (Post): Post object with attr `similar_posts` and `image_token`.
Raises:
Exception: If the response contains a 'description' field, an exception is raised with the description as the
error message.
Returns:
None
"""
safety = post.safety if post.safety else config.upload_media['default_safety']
source = post.source if post.source else ''
post_url = szuru.szuru_api_url + '/posts'
metadata = json.dumps(
{
'tags': post.tags,
'safety': safety,
'source': source,
'relations': post.similar_posts,
'contentToken': post.token,
},
)
try:
response = requests.post(post_url, headers=szuru.headers, data=metadata)
if 'description' in response.json():
raise Exception(response.json()['description'])
else:
return response.json()['id']
except Exception as e:
logger.warning(f'An error occured during the upload for file "{post.file_path}": {e}')
return None
def cleanup_dirs(dir: str) -> None:
"""
Remove empty directories recursively from bottom to top.
This function removes empty directories under the specified directory, starting from the deepest level and working
its way up. It also removes 'Thumbs.db' files created by Windows and '@eaDir' directories created on Synology systems.
The root directory itself is not deleted.
Args:
dir (str): The directory under which to cleanup - dir is the root level and won't get deleted.
Raises:
OSError: If an error occurs while removing a directory.
Returns:
None
"""
for root, dirs, files in os.walk(dir, topdown=False):
for name in files:
# Remove Thumbs.db file created by Windows
if name == 'Thumbs.db':
os.remove(os.path.join(root, name))
for name in dirs:
# Remove @eaDir directory created on Synology systems
if name == '@eaDir':
shutil.rmtree(os.path.join(root, name))
try:
os.rmdir(os.path.join(root, name))
except OSError:
pass
def eval_convert_image(file: bytes, file_ext: str, file_to_upload: str = None) -> tuple[bytes | str]:
"""
Evaluate if the image should be converted or shrunk and if so, do so.
This function checks if the image file should be converted to a different format or shrunk based on the global
configuration settings. If the image is a PNG and its size is greater than the conversion threshold, it will be
converted to a JPG. If the 'shrink' setting is enabled, the image will also be shrunk.
Args:
file (bytes): The file as a byte string.
file_ext (str): The file extension without a dot.
file_to_upload (str, optional): The file path of the file to upload (only for logging). Defaults to None.
Returns:
Tuple[bytes, str]: The (possibly converted and/or shrunk) file as a byte string and the MD5 sum of the original
file.
"""
file_size = len(file)
original_md5 = get_md5sum(file)
image = file
try:
if (
config.upload_media['convert_to_jpg']
and file_ext == 'png'
and file_size > config.upload_media['convert_threshold']
and config.upload_media['shrink']
):
logger.debug(
f'Converting and shrinking file, size {file_size} > {config.upload_media["convert_threshold"]}',
)
image = shrink_img(
file,
shrink_threshold=config.upload_media['shrink_threshold'],
shrink_dimensions=config.upload_media['shrink_dimensions'],
convert=True,
convert_quality=config.upload_media['convert_quality'],
)
elif config.upload_media['convert_to_jpg'] and file_ext == 'png' and file_size > config.upload_media['convert_threshold']:
logger.debug(
f'Converting file, size {file_size} > {config.upload_media["convert_threshold"]}',
)
image = shrink_img(
file,
convert=True,
convert_quality=config.upload_media['convert_quality'],
)
elif config.upload_media['shrink']:
logger.debug('Shrinking file...')
image = shrink_img(
file,
shrink_threshold=config.upload_media['shrink_threshold'],
shrink_dimensions=config.upload_media['shrink_dimensions'],
)
except OSError:
logger.warning(f'Could not shrink image {file_to_upload}. Keeping dimensions...')
return image, original_md5
def upload_post(
file: bytes,
file_ext: str,
metadata: dict = None,
file_path: str = None,
saucenao_limit_reached: bool = False,
) -> tuple[bool, bool]:
"""
Uploads given file to szurubooru and checks for similar posts.
This function uploads a file to szurubooru and checks for similar posts. If the file is not a video or GIF, it is
evaluated for conversion or shrinking. The file is then uploaded to szurubooru and a similarity check is performed.
If any errors occur during the similarity check, the function returns False.
Args:
file (bytes): The file as bytes.
file_ext (str): The file extension.
metadata (dict, optional): Attach metadata to the post. Defaults to None.
file_path (str, optional): The path to the file (used for debugging). Defaults to None.
saucenao_limit_reached (bool, optional): If the SauceNAO limit has been reached. Defaults to False.
Returns:
Tuple[bool, bool]: A tuple where the first element indicates if the upload was successful or not, and the second
element indicates if the SauceNAO limit has been reached.
"""
post = Post()
original_md5 = ''
if file_ext not in ['mp4', 'webm', 'gif']:
post.media, original_md5 = eval_convert_image(file, file_ext, file_path)
else:
post.media = file
post.token = get_media_token(szuru, post.media)
post.exact_post, similar_posts, errors = check_similarity(szuru, post.token)
if errors:
return False, False # Assume the saucenao_limit_reached is False
threshold = 1 - float(config.upload_media['max_similarity'])
for entry in similar_posts:
if entry['distance'] < threshold and not post.exact_post:
logger.debug(
f'File "{file_path} is too similar to post {entry["post"]["id"]} ({100 - entry["distance"]}%)',
)
post.exact_post = True
break
if not post.exact_post:
if not metadata:
post.tags = config.upload_media['tags']
post.safety = None
post.source = None
else:
post.tags = metadata['tags']
post.safety = metadata['safety']
post.source = metadata['source']
post.file_path = file_path
post.similar_posts = []
for entry in similar_posts:
post.similar_posts.append(entry['post']['id'])
post_id = upload_file(szuru, post)
if not post_id:
return False, saucenao_limit_reached
# Tag post if enabled
if config.upload_media['auto_tag']:
saucenao_limit_reached = auto_tagger.main(
post_id=str(post_id),
file_to_upload=post.media,
limit_reached=saucenao_limit_reached,
md5=original_md5,
)
else:
logger.debug('File is already uploaded')
if config.import_from_url['update_tags_if_exists'] and metadata:
logger.debug(f'Trying to update tags for post {post.exact_post["id"]}...')
id = str(post.exact_post['id']) if 'id' in post.exact_post else str(post.exact_post['post']['id'])
config.tag_posts['mode'] = 'append'
try:
if not metadata['tags'] and metadata['tag_string']:
metadata['tags'] = metadata['tag_string'].split(' ')
except KeyError:
pass
config.tag_posts['silence_info'] = True
tag_posts.main(query=id, add_tags=metadata['tags'], source=metadata['source'])
return True, saucenao_limit_reached
def main(
src_path: str = '',
file_to_upload: bytes = None,
file_ext: str = None,
metadata: dict = None,
saucenao_limit_reached: bool = False,
) -> int:
"""
Main logic of the script.
This function is the entry point of the script. It takes a source path or a file to upload, and optionally a file
extension and metadata. If no file to upload is provided, it will look for files in the source path. If no source
path is provided, it will use the source path from the configuration. It then uploads each file found and logs the
number of files uploaded.
Args:
src_path (str, optional): The source path where to look for files to upload. Defaults to ''.
file_to_upload (bytes, optional): A specific file to upload. Defaults to None.
file_ext (str, optional): The file extension of the file to upload. Defaults to None.
metadata (dict, optional): Metadata to attach to the post. Defaults to None.
saucenao_limit_reached (bool, optional): If the SauceNAO limit has been reached. Defaults to False.
Returns:
int: The number of files uploaded.
Raises:
KeyError: If no files are found to upload and no source path is specified.
"""
try:
if not file_to_upload:
try:
files_to_upload = src_path if src_path else get_files(config.upload_media['src_path'])
from_import_from = False
except KeyError:
logger.critical('No files found to upload. Please specify a source path.')
else:
files_to_upload = file_to_upload
from_import_from = True
config.upload_media['hide_progress'] = True
if files_to_upload:
if not from_import_from:
logger.info('Found ' + str(len(files_to_upload)) + ' file(s). Starting upload...')
try:
hide_progress = config.globals['hide_progress']
except KeyError:
hide_progress = config.tag_posts['hide_progress']
for file_path in tqdm(
files_to_upload,
ncols=80,
position=0,
leave=False,
disable=hide_progress,
):
with open(file_path, 'rb') as f:
file = f.read()
success, saucenao_limit_reached = upload_post(
file,
file_ext=Path(file_path).suffix[1:],
file_path=file_path,
saucenao_limit_reached=saucenao_limit_reached,
)
if config.upload_media['cleanup'] and success:
if os.path.exists(file_path):
os.remove(file_path)
if config.upload_media['cleanup']:
cleanup_dirs(config.upload_media['src_path']) # Remove dirs after files have been deleted
if not from_import_from:
logger.success('Script has finished uploading!')
else:
_, saucenao_limit_reached = upload_post(file_to_upload, file_ext, metadata, saucenao_limit_reached=saucenao_limit_reached)
return saucenao_limit_reached
else:
logger.info('No files found to upload.')
except KeyboardInterrupt:
logger.info('Received keyboard interrupt from user.')
exit(1)
if __name__ == '__main__':
main()