mirror of
https://github.com/janeczku/calibre-web
synced 2025-10-13 14:47:40 +00:00
Refactored helper.py and db.py
This commit is contained in:
169
cps/helper.py
169
cps/helper.py
@@ -74,8 +74,8 @@ log = logger.create()
|
||||
|
||||
# Convert existing book entry to new format
|
||||
def convert_book_format(book_id, calibrepath, old_book_format, new_book_format, user_id, kindle_mail=None):
|
||||
book = calibre_db.session.query(db.Books).filter(db.Books.id == book_id).first()
|
||||
data = calibre_db.session.query(db.Data).filter(db.Data.book == book.id).filter(db.Data.format == old_book_format).first()
|
||||
book = calibre_db.get_book(book_id)
|
||||
data = calibre_db.get_book_format(book.id, old_book_format)
|
||||
if not data:
|
||||
error_message = _(u"%(format)s format not found for book id: %(book)d", format=old_book_format, book=book_id)
|
||||
log.error("convert_book_format: %s", error_message)
|
||||
@@ -212,7 +212,7 @@ def check_read_formats(entry):
|
||||
# 3: If Pdf file is existing, it's directly send to kindle email
|
||||
def send_mail(book_id, book_format, convert, kindle_mail, calibrepath, user_id):
|
||||
"""Send email with attachments"""
|
||||
book = calibre_db.session.query(db.Books).filter(db.Books.id == book_id).first()
|
||||
book = calibre_db.get_book(book_id)
|
||||
|
||||
if convert == 1:
|
||||
# returns None if success, otherwise errormessage
|
||||
@@ -324,7 +324,7 @@ def delete_book_file(book, calibrepath, book_format=None):
|
||||
|
||||
|
||||
def update_dir_structure_file(book_id, calibrepath, first_author):
|
||||
localbook = calibre_db.session.query(db.Books).filter(db.Books.id == book_id).first()
|
||||
localbook = calibre_db.get_book(book_id)
|
||||
path = os.path.join(calibrepath, localbook.path)
|
||||
|
||||
authordir = localbook.path.split('/')[0]
|
||||
@@ -383,7 +383,7 @@ def update_dir_structure_file(book_id, calibrepath, first_author):
|
||||
|
||||
def update_dir_structure_gdrive(book_id, first_author):
|
||||
error = False
|
||||
book = calibre_db.session.query(db.Books).filter(db.Books.id == book_id).first()
|
||||
book = calibre_db.get_book(book_id)
|
||||
path = book.path
|
||||
|
||||
authordir = book.path.split('/')[0]
|
||||
@@ -494,18 +494,17 @@ def get_cover_on_failure(use_generic_cover):
|
||||
|
||||
|
||||
def get_book_cover(book_id):
|
||||
book = calibre_db.session.query(db.Books).filter(db.Books.id == book_id).filter(common_filters(allow_show_archived=True)).first()
|
||||
book = calibre_db.get_filtered_book(book_id, allow_show_archived=True)
|
||||
return get_book_cover_internal(book, use_generic_cover_on_failure=True)
|
||||
|
||||
|
||||
def get_book_cover_with_uuid(book_uuid,
|
||||
use_generic_cover_on_failure=True):
|
||||
book = calibre_db.session.query(db.Books).filter(db.Books.uuid == book_uuid).first()
|
||||
book = calibre_db.get_book_by_uuid(book_uuid)
|
||||
return get_book_cover_internal(book, use_generic_cover_on_failure)
|
||||
|
||||
|
||||
def get_book_cover_internal(book,
|
||||
use_generic_cover_on_failure):
|
||||
def get_book_cover_internal(book, use_generic_cover_on_failure):
|
||||
if book and book.has_cover:
|
||||
if config.config_use_google_drive:
|
||||
try:
|
||||
@@ -725,66 +724,12 @@ def render_task_status(tasklist):
|
||||
return renderedtasklist
|
||||
|
||||
|
||||
# Language and content filters for displaying in the UI
|
||||
def common_filters(allow_show_archived=False):
|
||||
if not allow_show_archived:
|
||||
archived_books = (
|
||||
ub.session.query(ub.ArchivedBook)
|
||||
.filter(ub.ArchivedBook.user_id == int(current_user.id))
|
||||
.filter(ub.ArchivedBook.is_archived == True)
|
||||
.all()
|
||||
)
|
||||
archived_book_ids = [archived_book.book_id for archived_book in archived_books]
|
||||
archived_filter = db.Books.id.notin_(archived_book_ids)
|
||||
else:
|
||||
archived_filter = true()
|
||||
|
||||
if current_user.filter_language() != "all":
|
||||
lang_filter = db.Books.languages.any(db.Languages.lang_code == current_user.filter_language())
|
||||
else:
|
||||
lang_filter = true()
|
||||
negtags_list = current_user.list_denied_tags()
|
||||
postags_list = current_user.list_allowed_tags()
|
||||
neg_content_tags_filter = false() if negtags_list == [''] else db.Books.tags.any(db.Tags.name.in_(negtags_list))
|
||||
pos_content_tags_filter = true() if postags_list == [''] else db.Books.tags.any(db.Tags.name.in_(postags_list))
|
||||
if config.config_restricted_column:
|
||||
pos_cc_list = current_user.allowed_column_value.split(',')
|
||||
pos_content_cc_filter = true() if pos_cc_list == [''] else \
|
||||
getattr(db.Books, 'custom_column_' + str(config.config_restricted_column)).\
|
||||
any(db.cc_classes[config.config_restricted_column].value.in_(pos_cc_list))
|
||||
neg_cc_list = current_user.denied_column_value.split(',')
|
||||
neg_content_cc_filter = false() if neg_cc_list == [''] else \
|
||||
getattr(db.Books, 'custom_column_' + str(config.config_restricted_column)).\
|
||||
any(db.cc_classes[config.config_restricted_column].value.in_(neg_cc_list))
|
||||
else:
|
||||
pos_content_cc_filter = true()
|
||||
neg_content_cc_filter = false()
|
||||
return and_(lang_filter, pos_content_tags_filter, ~neg_content_tags_filter,
|
||||
pos_content_cc_filter, ~neg_content_cc_filter, archived_filter)
|
||||
|
||||
|
||||
def tags_filters():
|
||||
negtags_list = current_user.list_denied_tags()
|
||||
postags_list = current_user.list_allowed_tags()
|
||||
neg_content_tags_filter = false() if negtags_list == [''] else db.Tags.name.in_(negtags_list)
|
||||
pos_content_tags_filter = true() if postags_list == [''] else db.Tags.name.in_(postags_list)
|
||||
return and_(pos_content_tags_filter, ~neg_content_tags_filter)
|
||||
# return ~(false()) if postags_list == [''] else db.Tags.in_(postags_list)
|
||||
|
||||
|
||||
# Creates for all stored languages a translated speaking name in the array for the UI
|
||||
def speaking_language(languages=None):
|
||||
if not languages:
|
||||
languages = calibre_db.session.query(db.Languages).join(db.books_languages_link).join(db.Books)\
|
||||
.filter(common_filters())\
|
||||
.group_by(text('books_languages_link.lang_code')).all()
|
||||
for lang in languages:
|
||||
try:
|
||||
cur_l = LC.parse(lang.lang_code)
|
||||
lang.name = cur_l.get_language_name(get_locale())
|
||||
except UnknownLocaleError:
|
||||
lang.name = _(isoLanguages.get(part3=lang.lang_code).name)
|
||||
return languages
|
||||
|
||||
|
||||
# checks if domain is in database (including wildcards)
|
||||
@@ -801,76 +746,9 @@ def check_valid_domain(domain_text):
|
||||
return not len(result)
|
||||
|
||||
|
||||
# Orders all Authors in the list according to authors sort
|
||||
def order_authors(entry):
|
||||
sort_authors = entry.author_sort.split('&')
|
||||
authors_ordered = list()
|
||||
error = False
|
||||
for auth in sort_authors:
|
||||
# ToDo: How to handle not found authorname
|
||||
result = calibre_db.session.query(db.Authors).filter(db.Authors.sort == auth.lstrip().strip()).first()
|
||||
if not result:
|
||||
error = True
|
||||
break
|
||||
authors_ordered.append(result)
|
||||
if not error:
|
||||
entry.authors = authors_ordered
|
||||
return entry
|
||||
|
||||
|
||||
# Fill indexpage with all requested data from database
|
||||
def fill_indexpage(page, database, db_filter, order, *join):
|
||||
return fill_indexpage_with_archived_books(page, database, db_filter, order, False, *join)
|
||||
|
||||
|
||||
def fill_indexpage_with_archived_books(page, database, db_filter, order, allow_show_archived, *join):
|
||||
if current_user.show_detail_random():
|
||||
randm = calibre_db.session.query(db.Books).filter(common_filters(allow_show_archived))\
|
||||
.order_by(func.random()).limit(config.config_random_books)
|
||||
else:
|
||||
randm = false()
|
||||
off = int(int(config.config_books_per_page) * (page - 1))
|
||||
query = calibre_db.session.query(database).join(*join, isouter=True).\
|
||||
filter(db_filter).\
|
||||
filter(common_filters(allow_show_archived))
|
||||
pagination = Pagination(page, config.config_books_per_page,
|
||||
len(query.all()))
|
||||
entries = query.order_by(*order).offset(off).limit(config.config_books_per_page).all()
|
||||
for book in entries:
|
||||
book = order_authors(book)
|
||||
return entries, randm, pagination
|
||||
|
||||
|
||||
def get_typeahead(database, query, replace=('', ''), tag_filter=true()):
|
||||
query = query or ''
|
||||
calibre_db.session.connection().connection.connection.create_function("lower", 1, lcase)
|
||||
entries = calibre_db.session.query(database).filter(tag_filter).\
|
||||
filter(func.lower(database.name).ilike("%" + query + "%")).all()
|
||||
json_dumps = json.dumps([dict(name=r.name.replace(*replace)) for r in entries])
|
||||
return json_dumps
|
||||
|
||||
|
||||
# read search results from calibre-database and return it (function is used for feed and simple search
|
||||
def get_search_results(term):
|
||||
calibre_db.session.connection().connection.connection.create_function("lower", 1, lcase)
|
||||
q = list()
|
||||
authorterms = re.split("[, ]+", term)
|
||||
for authorterm in authorterms:
|
||||
q.append(db.Books.authors.any(func.lower(db.Authors.name).ilike("%" + authorterm + "%")))
|
||||
|
||||
db.Books.authors.any(func.lower(db.Authors.name).ilike("%" + term + "%"))
|
||||
|
||||
return calibre_db.session.query(db.Books).filter(common_filters()).filter(
|
||||
or_(db.Books.tags.any(func.lower(db.Tags.name).ilike("%" + term + "%")),
|
||||
db.Books.series.any(func.lower(db.Series.name).ilike("%" + term + "%")),
|
||||
db.Books.authors.any(and_(*q)),
|
||||
db.Books.publishers.any(func.lower(db.Publishers.name).ilike("%" + term + "%")),
|
||||
func.lower(db.Books.title).ilike("%" + term + "%")
|
||||
)).order_by(db.Books.sort).all()
|
||||
|
||||
|
||||
def get_cc_columns(filter_config_custom_read=False):
|
||||
tmpcc = calibre_db.session.query(db.Custom_Columns).filter(db.Custom_Columns.datatype.notin_(db.cc_exceptions)).all()
|
||||
tmpcc = calibre_db.session.query(db.Custom_Columns)\
|
||||
.filter(db.Custom_Columns.datatype.notin_(db.cc_exceptions)).all()
|
||||
cc = []
|
||||
r = None
|
||||
if config.config_columns_to_ignore:
|
||||
@@ -887,10 +765,9 @@ def get_cc_columns(filter_config_custom_read=False):
|
||||
|
||||
def get_download_link(book_id, book_format, client):
|
||||
book_format = book_format.split(".")[0]
|
||||
book = calibre_db.session.query(db.Books).filter(db.Books.id == book_id).filter(common_filters()).first()
|
||||
book = calibre_db.get_filtered_book(book_id)
|
||||
if book:
|
||||
data1 = calibre_db.session.query(db.Data).filter(db.Data.book == book.id)\
|
||||
.filter(db.Data.format == book_format.upper()).first()
|
||||
data1 = data = calibre_db.get_book_format(book.id, book_format.upper())
|
||||
else:
|
||||
abort(404)
|
||||
if data1:
|
||||
@@ -908,25 +785,3 @@ def get_download_link(book_id, book_format, client):
|
||||
return do_download_file(book, book_format, client, data1, headers)
|
||||
else:
|
||||
abort(404)
|
||||
|
||||
|
||||
def check_exists_book(authr, title):
|
||||
calibre_db.session.connection().connection.connection.create_function("lower", 1, lcase)
|
||||
q = list()
|
||||
authorterms = re.split(r'\s*&\s*', authr)
|
||||
for authorterm in authorterms:
|
||||
q.append(db.Books.authors.any(func.lower(db.Authors.name).ilike("%" + authorterm + "%")))
|
||||
|
||||
return calibre_db.session.query(db.Books).filter(
|
||||
and_(db.Books.authors.any(and_(*q)),
|
||||
func.lower(db.Books.title).ilike("%" + title + "%")
|
||||
)).first()
|
||||
|
||||
############### Database Helper functions
|
||||
|
||||
|
||||
def lcase(s):
|
||||
try:
|
||||
return unidecode.unidecode(s.lower())
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
Reference in New Issue
Block a user