mirror of
https://github.com/janeczku/calibre-web
synced 2025-10-13 06:37:40 +00:00
Merge branch Develop into thumbnails
This commit is contained in:
186
cps/helper.py
186
cps/helper.py
@@ -17,19 +17,18 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from __future__ import division, print_function, unicode_literals
|
||||
import sys
|
||||
import os
|
||||
import io
|
||||
import mimetypes
|
||||
import re
|
||||
import shutil
|
||||
import time
|
||||
import socket
|
||||
import unicodedata
|
||||
from datetime import datetime, timedelta
|
||||
from tempfile import gettempdir
|
||||
|
||||
from urllib.parse import urlparse
|
||||
import requests
|
||||
|
||||
from babel.dates import format_datetime
|
||||
from babel.units import format_unit
|
||||
from flask import send_from_directory, make_response, redirect, abort, url_for
|
||||
@@ -39,11 +38,7 @@ from sqlalchemy.sql.expression import true, false, and_, or_, text, func
|
||||
from werkzeug.datastructures import Headers
|
||||
from werkzeug.security import generate_password_hash
|
||||
from markupsafe import escape
|
||||
|
||||
try:
|
||||
from urllib.parse import quote
|
||||
except ImportError:
|
||||
from urllib import quote
|
||||
from urllib.parse import quote
|
||||
|
||||
try:
|
||||
import unidecode
|
||||
@@ -51,9 +46,9 @@ try:
|
||||
except ImportError:
|
||||
use_unidecode = False
|
||||
|
||||
from . import calibre_db
|
||||
from . import calibre_db, cli
|
||||
from .tasks.convert import TaskConvert
|
||||
from . import logger, config, get_locale, db, fs, ub
|
||||
from . import logger, config, get_locale, db, ub, kobo_sync_status, fs
|
||||
from . import gdriveutils as gd
|
||||
from .constants import STATIC_DIR as _STATIC_DIR, CACHE_TYPE_THUMBNAILS, THUMBNAIL_TYPE_COVER, THUMBNAIL_TYPE_SERIES
|
||||
from .subproc_wrapper import process_wait
|
||||
@@ -66,7 +61,7 @@ log = logger.create()
|
||||
|
||||
try:
|
||||
from wand.image import Image
|
||||
from wand.exceptions import MissingDelegateError
|
||||
from wand.exceptions import MissingDelegateError, BlobError
|
||||
use_IM = True
|
||||
except (ImportError, RuntimeError) as e:
|
||||
log.debug('Cannot import Image, generating covers from non jpg files will not work: %s', e)
|
||||
@@ -236,7 +231,7 @@ def get_valid_filename(value, replace_whitespace=True):
|
||||
value = value[:-1]+u'_'
|
||||
value = value.replace("/", "_").replace(":", "_").strip('\0')
|
||||
if use_unidecode:
|
||||
if not config.config_unicode_filename:
|
||||
if config.config_unicode_filename:
|
||||
value = (unidecode.unidecode(value))
|
||||
else:
|
||||
value = value.replace(u'§', u'SS')
|
||||
@@ -338,8 +333,44 @@ def delete_book_file(book, calibrepath, book_format=None):
|
||||
path=book.path)
|
||||
|
||||
|
||||
def clean_author_database(renamed_author, calibrepath, local_book=None, gdrive=None):
|
||||
valid_filename_authors = [get_valid_filename(r) for r in renamed_author]
|
||||
for r in renamed_author:
|
||||
if local_book:
|
||||
all_books = [local_book]
|
||||
else:
|
||||
all_books = calibre_db.session.query(db.Books) \
|
||||
.filter(db.Books.authors.any(db.Authors.name == r)).all()
|
||||
for book in all_books:
|
||||
book_author_path = book.path.split('/')[0]
|
||||
if book_author_path in valid_filename_authors or local_book:
|
||||
new_author = calibre_db.session.query(db.Authors).filter(db.Authors.name == r).first()
|
||||
all_new_authordir = get_valid_filename(new_author.name)
|
||||
all_titledir = book.path.split('/')[1]
|
||||
all_new_path = os.path.join(calibrepath, all_new_authordir, all_titledir)
|
||||
all_new_name = get_valid_filename(book.title) + ' - ' + all_new_authordir
|
||||
# change location in database to new author/title path
|
||||
book.path = os.path.join(all_new_authordir, all_titledir).replace('\\', '/')
|
||||
for file_format in book.data:
|
||||
if not gdrive:
|
||||
shutil.move(os.path.normcase(os.path.join(all_new_path,
|
||||
file_format.name + '.' + file_format.format.lower())),
|
||||
os.path.normcase(os.path.join(all_new_path,
|
||||
all_new_name + '.' + file_format.format.lower())))
|
||||
else:
|
||||
gFile = gd.getFileFromEbooksFolder(all_new_path,
|
||||
file_format.name + '.' + file_format.format.lower())
|
||||
if gFile:
|
||||
gd.moveGdriveFileRemote(gFile, all_new_name + u'.' + file_format.format.lower())
|
||||
gd.updateDatabaseOnEdit(gFile['id'], all_new_name + u'.' + file_format.format.lower())
|
||||
else:
|
||||
log.error("File {} not found on gdrive"
|
||||
.format(all_new_path, file_format.name + '.' + file_format.format.lower()))
|
||||
file_format.name = all_new_name
|
||||
|
||||
|
||||
# Moves files in file storage during author/title rename, or from temp dir to file storage
|
||||
def update_dir_structure_file(book_id, calibrepath, first_author, orignal_filepath, db_filename):
|
||||
def update_dir_structure_file(book_id, calibrepath, first_author, orignal_filepath, db_filename, renamed_author):
|
||||
# get book database entry from id, if original path overwrite source with original_filepath
|
||||
localbook = calibre_db.get_book(book_id)
|
||||
if orignal_filepath:
|
||||
@@ -355,21 +386,35 @@ def update_dir_structure_file(book_id, calibrepath, first_author, orignal_filepa
|
||||
# Create new titledir from database and add id
|
||||
if first_author:
|
||||
new_authordir = get_valid_filename(first_author)
|
||||
for r in renamed_author:
|
||||
new_author = calibre_db.session.query(db.Authors).filter(db.Authors.name == r).first()
|
||||
old_author_dir = get_valid_filename(r)
|
||||
new_author_rename_dir = get_valid_filename(new_author.name)
|
||||
if os.path.isdir(os.path.join(calibrepath, old_author_dir)):
|
||||
try:
|
||||
old_author_path = os.path.join(calibrepath, old_author_dir)
|
||||
new_author_path = os.path.join(calibrepath, new_author_rename_dir)
|
||||
shutil.move(os.path.normcase(old_author_path), os.path.normcase(new_author_path))
|
||||
except (OSError) as ex:
|
||||
log.error("Rename author from: %s to %s: %s", old_author_path, new_author_path, ex)
|
||||
log.debug(ex, exc_info=True)
|
||||
return _("Rename author from: '%(src)s' to '%(dest)s' failed with error: %(error)s",
|
||||
src=old_author_path, dest=new_author_path, error=str(ex))
|
||||
else:
|
||||
new_authordir = get_valid_filename(localbook.authors[0].name)
|
||||
new_titledir = get_valid_filename(localbook.title) + " (" + str(book_id) + ")"
|
||||
|
||||
if titledir != new_titledir or authordir != new_authordir or orignal_filepath:
|
||||
new_path = os.path.join(calibrepath, new_authordir, new_titledir)
|
||||
new_name = get_valid_filename(localbook.title) + ' - ' + get_valid_filename(new_authordir)
|
||||
new_name = get_valid_filename(localbook.title) + ' - ' + new_authordir
|
||||
try:
|
||||
if orignal_filepath:
|
||||
if not os.path.isdir(new_path):
|
||||
os.makedirs(new_path)
|
||||
shutil.move(os.path.normcase(path), os.path.normcase(os.path.join(new_path, db_filename)))
|
||||
log.debug("Moving title: %s to %s/%s", path, new_path, new_name)
|
||||
# Check new path is not valid path
|
||||
else:
|
||||
# Check new path is not valid path
|
||||
if not os.path.exists(new_path):
|
||||
# move original path to new path
|
||||
log.debug("Moving title: %s to %s", path, new_path)
|
||||
@@ -381,7 +426,6 @@ def update_dir_structure_file(book_id, calibrepath, first_author, orignal_filepa
|
||||
for file in file_list:
|
||||
shutil.move(os.path.normcase(os.path.join(dir_name, file)),
|
||||
os.path.normcase(os.path.join(new_path + dir_name[len(path):], file)))
|
||||
# os.unlink(os.path.normcase(os.path.join(dir_name, file)))
|
||||
# change location in database to new author/title path
|
||||
localbook.path = os.path.join(new_authordir, new_titledir).replace('\\','/')
|
||||
except (OSError) as ex:
|
||||
@@ -392,31 +436,38 @@ def update_dir_structure_file(book_id, calibrepath, first_author, orignal_filepa
|
||||
|
||||
# Rename all files from old names to new names
|
||||
try:
|
||||
for file_format in localbook.data:
|
||||
shutil.move(os.path.normcase(
|
||||
os.path.join(new_path, file_format.name + '.' + file_format.format.lower())),
|
||||
os.path.normcase(os.path.join(new_path, new_name + '.' + file_format.format.lower())))
|
||||
file_format.name = new_name
|
||||
if not orignal_filepath and len(os.listdir(os.path.dirname(path))) == 0:
|
||||
clean_author_database(renamed_author, calibrepath)
|
||||
if first_author not in renamed_author:
|
||||
clean_author_database([first_author], calibrepath, localbook)
|
||||
if not renamed_author and not orignal_filepath and len(os.listdir(os.path.dirname(path))) == 0:
|
||||
shutil.rmtree(os.path.dirname(path))
|
||||
except (OSError) as ex:
|
||||
log.error("Rename file in path %s to %s: %s", new_path, new_name, ex)
|
||||
except (OSError, FileNotFoundError) as ex:
|
||||
log.error("Error in rename file in path %s", ex)
|
||||
log.debug(ex, exc_info=True)
|
||||
return _("Rename file in path '%(src)s' to '%(dest)s' failed with error: %(error)s",
|
||||
src=new_path, dest=new_name, error=str(ex))
|
||||
return _("Error in rename file in path: %(error)s", error=str(ex))
|
||||
return False
|
||||
|
||||
|
||||
def update_dir_structure_gdrive(book_id, first_author):
|
||||
def update_dir_structure_gdrive(book_id, first_author, renamed_author):
|
||||
error = False
|
||||
book = calibre_db.get_book(book_id)
|
||||
path = book.path
|
||||
|
||||
authordir = book.path.split('/')[0]
|
||||
if first_author:
|
||||
new_authordir = get_valid_filename(first_author)
|
||||
for r in renamed_author:
|
||||
# Todo: Rename all authors on gdrive
|
||||
new_author = calibre_db.session.query(db.Authors).filter(db.Authors.name == r).first()
|
||||
old_author_dir = get_valid_filename(r)
|
||||
new_author_rename_dir = get_valid_filename(new_author.name)
|
||||
gFile = gd.getFileFromEbooksFolder(None, old_author_dir)
|
||||
if gFile:
|
||||
gd.moveGdriveFolderRemote(gFile, new_author_rename_dir)
|
||||
else:
|
||||
error = _(u'File %(file)s not found on Google Drive', file=authordir) # file not found
|
||||
else:
|
||||
new_authordir = get_valid_filename(book.authors[0].name)
|
||||
|
||||
titledir = book.path.split('/')[1]
|
||||
new_titledir = get_valid_filename(book.title) + u" (" + str(book_id) + u")"
|
||||
|
||||
@@ -426,31 +477,27 @@ def update_dir_structure_gdrive(book_id, first_author):
|
||||
gFile['title'] = new_titledir
|
||||
gFile.Upload()
|
||||
book.path = book.path.split('/')[0] + u'/' + new_titledir
|
||||
path = book.path
|
||||
gd.updateDatabaseOnEdit(gFile['id'], book.path) # only child folder affected
|
||||
else:
|
||||
error = _(u'File %(file)s not found on Google Drive', file=book.path) # file not found
|
||||
|
||||
if authordir != new_authordir:
|
||||
if authordir != new_authordir and authordir not in renamed_author:
|
||||
gFile = gd.getFileFromEbooksFolder(os.path.dirname(book.path), new_titledir)
|
||||
if gFile:
|
||||
gd.moveGdriveFolderRemote(gFile, new_authordir)
|
||||
book.path = new_authordir + u'/' + book.path.split('/')[1]
|
||||
path = book.path
|
||||
gd.updateDatabaseOnEdit(gFile['id'], book.path)
|
||||
else:
|
||||
error = _(u'File %(file)s not found on Google Drive', file=authordir) # file not found
|
||||
# Rename all files from old names to new names
|
||||
|
||||
if authordir != new_authordir or titledir != new_titledir:
|
||||
new_name = get_valid_filename(book.title) + u' - ' + get_valid_filename(new_authordir)
|
||||
for file_format in book.data:
|
||||
gFile = gd.getFileFromEbooksFolder(path, file_format.name + u'.' + file_format.format.lower())
|
||||
if not gFile:
|
||||
error = _(u'File %(file)s not found on Google Drive', file=file_format.name) # file not found
|
||||
break
|
||||
gd.moveGdriveFileRemote(gFile, new_name + u'.' + file_format.format.lower())
|
||||
file_format.name = new_name
|
||||
# change location in database to new author/title path
|
||||
book.path = os.path.join(new_authordir, new_titledir).replace('\\', '/')
|
||||
|
||||
# Rename all files from old names to new names
|
||||
clean_author_database(renamed_author, "", gdrive=True)
|
||||
if first_author not in renamed_author:
|
||||
clean_author_database([first_author], "", book, gdrive=True)
|
||||
|
||||
return error
|
||||
|
||||
|
||||
@@ -493,10 +540,7 @@ def reset_password(user_id):
|
||||
def generate_random_password():
|
||||
s = "abcdefghijklmnopqrstuvwxyz01234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ!@#$%&*()?"
|
||||
passlen = 8
|
||||
if sys.version_info < (3, 0):
|
||||
return "".join(s[ord(c) % len(s)] for c in os.urandom(passlen))
|
||||
else:
|
||||
return "".join(s[c % len(s)] for c in os.urandom(passlen))
|
||||
return "".join(s[c % len(s)] for c in os.urandom(passlen))
|
||||
|
||||
|
||||
def uniq(inpt):
|
||||
@@ -536,11 +580,20 @@ def valid_email(email):
|
||||
# ################################# External interface #################################
|
||||
|
||||
|
||||
def update_dir_stucture(book_id, calibrepath, first_author=None, orignal_filepath=None, db_filename=None):
|
||||
def update_dir_structure(book_id,
|
||||
calibrepath,
|
||||
first_author=None,
|
||||
orignal_filepath=None,
|
||||
db_filename=None,
|
||||
renamed_author=False):
|
||||
if config.config_use_google_drive:
|
||||
return update_dir_structure_gdrive(book_id, first_author)
|
||||
return update_dir_structure_gdrive(book_id, first_author, renamed_author)
|
||||
else:
|
||||
return update_dir_structure_file(book_id, calibrepath, first_author, orignal_filepath, db_filename)
|
||||
return update_dir_structure_file(book_id,
|
||||
calibrepath,
|
||||
first_author,
|
||||
orignal_filepath,
|
||||
db_filename, renamed_author)
|
||||
|
||||
|
||||
def delete_book(book, calibrepath, book_format):
|
||||
@@ -659,10 +712,17 @@ def get_series_thumbnail(series_id, resolution):
|
||||
# saves book cover from url
|
||||
def save_cover_from_url(url, book_path):
|
||||
try:
|
||||
if not cli.allow_localhost:
|
||||
# 127.0.x.x, localhost, [::1], [::ffff:7f00:1]
|
||||
ip = socket.getaddrinfo(urlparse(url).hostname, 0)[0][4][0]
|
||||
if ip.startswith("127.") or ip.startswith('::ffff:7f') or ip == "::1":
|
||||
log.error("Localhost was accessed for cover upload")
|
||||
return False, _("You are not allowed to access localhost for cover uploads")
|
||||
img = requests.get(url, timeout=(10, 200)) # ToDo: Error Handling
|
||||
img.raise_for_status()
|
||||
return save_cover(img, book_path)
|
||||
except (requests.exceptions.HTTPError,
|
||||
except (socket.gaierror,
|
||||
requests.exceptions.HTTPError,
|
||||
requests.exceptions.ConnectionError,
|
||||
requests.exceptions.Timeout) as ex:
|
||||
log.info(u'Cover Download Error %s', ex)
|
||||
@@ -709,13 +769,17 @@ def save_cover(img, book_path):
|
||||
return False, _("Only jpg/jpeg/png/webp/bmp files are supported as coverfile")
|
||||
# convert to jpg because calibre only supports jpg
|
||||
if content_type != 'image/jpg':
|
||||
if hasattr(img, 'stream'):
|
||||
imgc = Image(blob=img.stream)
|
||||
else:
|
||||
imgc = Image(blob=io.BytesIO(img.content))
|
||||
imgc.format = 'jpeg'
|
||||
imgc.transform_colorspace("rgb")
|
||||
img = imgc
|
||||
try:
|
||||
if hasattr(img, 'stream'):
|
||||
imgc = Image(blob=img.stream)
|
||||
else:
|
||||
imgc = Image(blob=io.BytesIO(img.content))
|
||||
imgc.format = 'jpeg'
|
||||
imgc.transform_colorspace("rgb")
|
||||
img = imgc
|
||||
except (BlobError, MissingDelegateError):
|
||||
log.error("Invalid cover file content")
|
||||
return False, _("Invalid cover file content")
|
||||
else:
|
||||
if content_type not in 'image/jpeg':
|
||||
log.error("Only jpg/jpeg files are supported as coverfile")
|
||||
@@ -740,9 +804,9 @@ def save_cover(img, book_path):
|
||||
|
||||
def do_download_file(book, book_format, client, data, headers):
|
||||
if config.config_use_google_drive:
|
||||
startTime = time.time()
|
||||
#startTime = time.time()
|
||||
df = gd.getFileFromEbooksFolder(book.path, data.name + "." + book_format)
|
||||
log.debug('%s', time.time() - startTime)
|
||||
#log.debug('%s', time.time() - startTime)
|
||||
if df:
|
||||
return gd.do_gdrive_download(df, headers)
|
||||
else:
|
||||
@@ -774,8 +838,6 @@ def check_unrar(unrarLocation):
|
||||
return _('Unrar binary file not found')
|
||||
|
||||
try:
|
||||
if sys.version_info < (3, 0):
|
||||
unrarLocation = unrarLocation.encode(sys.getfilesystemencoding())
|
||||
unrarLocation = [unrarLocation]
|
||||
value = process_wait(unrarLocation, pattern='UNRAR (.*) freeware')
|
||||
if value:
|
||||
@@ -902,7 +964,7 @@ def get_cc_columns(filter_config_custom_read=False):
|
||||
|
||||
def get_download_link(book_id, book_format, client):
|
||||
book_format = book_format.split(".")[0]
|
||||
book = calibre_db.get_filtered_book(book_id)
|
||||
book = calibre_db.get_filtered_book(book_id, allow_show_archived=True)
|
||||
if book:
|
||||
data1 = calibre_db.get_book_format(book.id, book_format.upper())
|
||||
else:
|
||||
|
Reference in New Issue
Block a user