1
0
mirror of https://github.com/janeczku/calibre-web synced 2025-01-20 06:02:55 +00:00
calibre-web/cps/web.py

1664 lines
81 KiB
Python
Raw Normal View History

# This file is part of the Calibre-Web (https://github.com/janeczku/calibre-web)
# Copyright (C) 2018-2019 OzzieIsaacs, cervinko, jkrehm, bodybybuddha, ok11,
# andy29485, idalin, Kyosfonica, wuqi, Kennyl, lemmsh,
# falgh1, grunjol, csitko, ytils, xybydy, trasba, vrabe,
# ruben-herold, marblepebble, JackED42, SiphonSquirrel,
# apetresc, nanu-c, mutschler
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import json
import mimetypes
import chardet # dependency of requests
import copy
from importlib.metadata import metadata
2018-11-03 12:43:38 +00:00
from flask import Blueprint, jsonify, request, redirect, send_from_directory, make_response, flash, abort, url_for
from flask import session as flask_session
from flask_babel import gettext as _
from flask_babel import get_locale
from .cw_login import login_user, logout_user, current_user
from flask_limiter import RateLimitExceeded
2023-02-12 12:10:00 +00:00
from flask_limiter.util import get_remote_address
from sqlalchemy.exc import IntegrityError, InvalidRequestError, OperationalError
2022-07-16 17:27:44 +00:00
from sqlalchemy.sql.expression import text, func, false, not_, and_, or_
2020-07-08 19:18:38 +00:00
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.sql.functions import coalesce
from werkzeug.datastructures import Headers
from werkzeug.security import generate_password_hash, check_password_hash
from . import constants, logger, isoLanguages, services
from . import db, ub, config, app
2021-10-10 08:23:58 +00:00
from . import calibre_db, kobo_sync_status
from .search import render_search_results, render_adv_search_results
from .gdriveutils import getFileFromEbooksFolder, do_gdrive_download
from .helper import check_valid_domain, check_email, check_username, \
get_book_cover, get_series_cover_thumbnail, get_download_link, send_mail, generate_random_password, \
send_registration_mail, check_send_to_ereader, check_read_formats, tags_filters, reset_password, valid_email, \
edit_book_read_status, valid_password
from .pagination import Pagination
from .redirect import get_redirect_location
from .cw_babel import get_available_locale
from .usermanagement import login_required_if_no_ano
from .kobo_sync_status import remove_synced_book
from .render_template import render_title_template
2021-12-05 12:04:13 +00:00
from .kobo_sync_status import change_archived_books
2022-07-02 17:12:18 +00:00
from . import limiter
2023-02-04 13:51:41 +00:00
from .services.worker import WorkerThread
from .tasks_status import render_task_status
from .usermanagement import user_login_required
from .string_helper import strip_whitespaces
2022-06-16 09:15:17 +00:00
feature_support = {
'ldap': bool(services.ldap),
'goodreads': bool(services.goodreads_support),
'kobo': bool(services.kobo)
}
try:
from .oauth_bb import oauth_check, register_user_with_oauth, logout_oauth_user, get_oauth_status
feature_support['oauth'] = True
except ImportError:
feature_support['oauth'] = False
oauth_check = {}
register_user_with_oauth = logout_oauth_user = get_oauth_status = None
from functools import wraps
try:
from natsort import natsorted as sort
except ImportError:
sort = sorted # Just use regular sort then, may cause issues with badly named pages in cbz/cbr files
2018-10-01 08:35:13 +00:00
sql_version = metadata("sqlalchemy")["Version"]
sqlalchemy_version2 = ([int(x) if x.isnumeric() else 0 for x in sql_version.split('.')[:3]] >= [2, 0, 0])
2024-09-07 17:59:02 +00:00
@app.after_request
def add_security_headers(resp):
default_src = ([host.strip() for host in config.config_trustedhosts.split(',') if host] +
["'self'", "'unsafe-inline'", "'unsafe-eval'"])
csp = "default-src " + ' '.join(default_src)
if request.endpoint == "web.read_book" and config.config_use_google_drive:
csp +=" blob: "
csp += "; font-src 'self' data:"
if request.endpoint == "web.read_book":
csp += " blob: "
csp += "; img-src 'self'"
if request.path.startswith("/author/") and config.config_use_goodreads:
csp += " images.gr-assets.com i.gr-assets.com s.gr-assets.com"
csp += " data:"
if request.endpoint == "edit-book.show_edit_book" or config.config_use_google_drive:
csp += " *"
if request.endpoint == "web.read_book":
csp += " blob: ; style-src-elem 'self' blob: 'unsafe-inline'"
csp += "; object-src 'none';"
resp.headers['Content-Security-Policy'] = csp
resp.headers['X-Content-Type-Options'] = 'nosniff'
resp.headers['X-Frame-Options'] = 'SAMEORIGIN'
resp.headers['X-XSS-Protection'] = '1; mode=block'
resp.headers['Strict-Transport-Security'] = 'max-age=31536000';
return resp
web = Blueprint('web', __name__)
log = logger.create()
# ################################### Login logic and rights management ###############################################
2016-04-27 08:35:23 +00:00
def download_required(f):
@wraps(f)
def inner(*args, **kwargs):
if current_user.role_download():
2016-04-27 08:35:23 +00:00
return f(*args, **kwargs)
abort(403)
2016-04-27 08:35:23 +00:00
return inner
def viewer_required(f):
@wraps(f)
def inner(*args, **kwargs):
if current_user.role_viewer():
return f(*args, **kwargs)
abort(403)
return inner
# ################################### data provider functions #########################################################
2017-01-15 11:37:58 +00:00
@web.route("/ajax/emailstat")
@user_login_required
def get_email_status_json():
tasks = WorkerThread.get_instance().tasks
return jsonify(render_task_status(tasks))
@web.route("/ajax/bookmark/<int:book_id>/<book_format>", methods=['POST'])
@user_login_required
2022-03-13 09:23:13 +00:00
def set_bookmark(book_id, book_format):
bookmark_key = request.form["bookmark"]
ub.session.query(ub.Bookmark).filter(and_(ub.Bookmark.user_id == int(current_user.id),
ub.Bookmark.book_id == book_id,
ub.Bookmark.format == book_format)).delete()
if not bookmark_key:
ub.session_commit()
return "", 204
l_bookmark = ub.Bookmark(user_id=current_user.id,
book_id=book_id,
format=book_format,
bookmark_key=bookmark_key)
ub.session.merge(l_bookmark)
ub.session_commit("Bookmark for user {} in book {} created".format(current_user.id, book_id))
return "", 201
@web.route("/ajax/toggleread/<int:book_id>", methods=['POST'])
@user_login_required
def toggle_read(book_id):
message = edit_book_read_status(book_id)
if message:
return message, 400
else:
return message
@web.route("/ajax/togglearchived/<int:book_id>", methods=['POST'])
@user_login_required
def toggle_archived(book_id):
change_archived_books(book_id, message="Book {} archive bit toggled".format(book_id))
# Remove book from syncd books list to force resync (?)
remove_synced_book(book_id)
return ""
@web.route("/ajax/view", methods=["POST"])
@login_required_if_no_ano
def update_view():
to_save = request.get_json()
2020-09-27 10:37:41 +00:00
try:
for element in to_save:
for param in to_save[element]:
current_user.set_view_property(element, param, to_save[element][param])
except Exception as ex:
log.error("Could not save view_settings: %r %r: %e", request, to_save, ex)
return "Invalid request", 400
2020-08-23 07:13:44 +00:00
return "1", 200
'''
@web.route("/ajax/getcomic/<int:book_id>/<book_format>/<int:page>")
@user_login_required
2017-11-18 09:34:21 +00:00
def get_comic_book(book_id, book_format, page):
2020-05-23 08:16:29 +00:00
book = calibre_db.get_book(book_id)
2017-11-18 09:34:21 +00:00
if not book:
return "", 204
2017-11-18 09:34:21 +00:00
else:
for bookformat in book.data:
if bookformat.format.lower() == book_format.lower():
cbr_file = os.path.join(config.config_calibre_dir, book.path, bookformat.name) + "." + book_format
2017-12-15 17:14:20 +00:00
if book_format in ("cbr", "rar"):
if feature_support['rar'] == True:
2017-11-18 09:34:21 +00:00
rarfile.UNRAR_TOOL = config.config_rarfile_location
try:
rf = rarfile.RarFile(cbr_file)
2018-01-11 13:43:39 +00:00
names = sort(rf.namelist())
extract = lambda page: rf.read(names[page])
2017-11-18 09:34:21 +00:00
except:
# rarfile not valid
log.error('Unrar binary not found, or unable to decompress file %s', cbr_file)
return "", 204
2017-11-18 09:34:21 +00:00
else:
log.info('Unrar is not supported please install python rarfile extension')
2017-11-18 09:34:21 +00:00
# no support means return nothing
return "", 204
2018-01-11 13:43:39 +00:00
elif book_format in ("cbz", "zip"):
2017-11-18 09:34:21 +00:00
zf = zipfile.ZipFile(cbr_file)
2018-01-11 13:43:39 +00:00
names=sort(zf.namelist())
extract = lambda page: zf.read(names[page])
elif book_format in ("cbt", "tar"):
2017-12-15 17:14:20 +00:00
tf = tarfile.TarFile(cbr_file)
2018-01-11 13:43:39 +00:00
names=sort(tf.getnames())
extract = lambda page: tf.extractfile(names[page]).read()
else:
log.error('unsupported comic format')
2018-01-11 13:43:39 +00:00
return "", 204
2021-10-04 16:26:46 +00:00
b64 = codecs.encode(extract(page), 'base64').decode()
2018-01-11 13:43:39 +00:00
ext = names[page].rpartition('.')[-1]
if ext not in ('png', 'gif', 'jpg', 'jpeg', 'webp'):
2018-01-11 13:43:39 +00:00
ext = 'png'
extractedfile="data:image/" + ext + ";base64," + b64
fileData={"name": names[page], "page":page, "last":len(names)-1, "content": extractedfile}
2017-11-18 09:34:21 +00:00
return make_response(json.dumps(fileData))
return "", 204
'''
2017-11-18 09:34:21 +00:00
# ################################### Typeahead ##################################################################
@web.route("/get_authors_json", methods=['GET'])
2016-04-27 08:35:23 +00:00
@login_required_if_no_ano
def get_authors_json():
2020-05-23 08:16:29 +00:00
return calibre_db.get_typeahead(db.Authors, request.args.get('q'), ('|', ','))
@web.route("/get_publishers_json", methods=['GET'])
@login_required_if_no_ano
def get_publishers_json():
2020-05-23 08:16:29 +00:00
return calibre_db.get_typeahead(db.Publishers, request.args.get('q'), ('|', ','))
@web.route("/get_tags_json", methods=['GET'])
2016-04-27 08:35:23 +00:00
@login_required_if_no_ano
def get_tags_json():
2020-05-23 08:16:29 +00:00
return calibre_db.get_typeahead(db.Tags, request.args.get('q'), tag_filter=tags_filters())
@web.route("/get_series_json", methods=['GET'])
@login_required_if_no_ano
def get_series_json():
2020-05-23 08:16:29 +00:00
return calibre_db.get_typeahead(db.Series, request.args.get('q'))
2017-04-14 18:29:11 +00:00
@web.route("/get_languages_json", methods=['GET'])
@login_required_if_no_ano
def get_languages_json():
query = (request.args.get('q') or '').lower()
language_names = isoLanguages.get_language_names(get_locale())
entries_start = [s for key, s in language_names.items() if s.lower().startswith(query.lower())]
if len(entries_start) < 5:
entries = [s for key, s in language_names.items() if query in s.lower()]
entries_start.extend(entries[0:(5 - len(entries_start))])
entries_start = list(set(entries_start))
json_dumps = json.dumps([dict(name=r) for r in entries_start[0:5]])
return json_dumps
@web.route("/get_matching_tags", methods=['GET'])
2016-05-02 20:43:50 +00:00
@login_required_if_no_ano
def get_matching_tags():
2016-05-02 20:43:50 +00:00
tag_dict = {'tags': []}
2020-05-27 17:19:17 +00:00
q = calibre_db.session.query(db.Books).filter(calibre_db.common_filters(True))
calibre_db.create_functions()
# calibre_db.session.connection().connection.connection.create_function("lower", 1, db.lcase)
author_input = request.args.get('authors') or ''
title_input = request.args.get('title') or ''
include_tag_inputs = request.args.getlist('include_tag') or ''
exclude_tag_inputs = request.args.getlist('exclude_tag') or ''
q = q.filter(db.Books.authors.any(func.lower(db.Authors.name).ilike("%" + author_input + "%")),
func.lower(db.Books.title).ilike("%" + title_input + "%"))
if len(include_tag_inputs) > 0:
for tag in include_tag_inputs:
q = q.filter(db.Books.tags.any(db.Tags.id == tag))
if len(exclude_tag_inputs) > 0:
for tag in exclude_tag_inputs:
q = q.filter(not_(db.Books.tags.any(db.Tags.id == tag)))
for book in q:
for tag in book.tags:
if tag.id not in tag_dict['tags']:
tag_dict['tags'].append(tag.id)
2016-05-02 20:43:50 +00:00
json_dumps = json.dumps(tag_dict)
return json_dumps
2015-10-13 17:06:37 +00:00
def generate_char_list(entries): # data_colum, db_link):
char_list = list()
for entry in entries:
upper_char = entry[0].name[0].upper()
if upper_char not in char_list:
char_list.append(upper_char)
return char_list
def query_char_list(data_colum, db_link):
results = (calibre_db.session.query(func.upper(func.substr(data_colum, 1, 1)).label('char'))
.join(db_link).join(db.Books).filter(calibre_db.common_filters())
.group_by(func.upper(func.substr(data_colum, 1, 1))).all())
return results
def get_sort_function(sort_param, data):
order = [db.Books.timestamp.desc()]
if sort_param == 'stored':
sort_param = current_user.get_view_property(data, 'stored')
else:
current_user.set_view_property(data, 'stored', sort_param)
if sort_param == 'pubnew':
order = [db.Books.pubdate.desc()]
if sort_param == 'pubold':
order = [db.Books.pubdate]
if sort_param == 'abc':
order = [db.Books.sort]
if sort_param == 'zyx':
order = [db.Books.sort.desc()]
if sort_param == 'new':
order = [db.Books.timestamp.desc()]
if sort_param == 'old':
order = [db.Books.timestamp]
if sort_param == 'authaz':
order = [db.Books.author_sort.asc(), db.Series.name, db.Books.series_index]
if sort_param == 'authza':
order = [db.Books.author_sort.desc(), db.Series.name.desc(), db.Books.series_index.desc()]
if sort_param == 'seriesasc':
order = [db.Books.series_index.asc()]
if sort_param == 'seriesdesc':
order = [db.Books.series_index.desc()]
if sort_param == 'hotdesc':
order = [func.count(ub.Downloads.book_id).desc()]
if sort_param == 'hotasc':
order = [func.count(ub.Downloads.book_id).asc()]
if sort_param is None:
sort_param = "new"
return order, sort_param
2021-03-14 12:28:52 +00:00
def render_books_list(data, sort_param, book_id, page):
order = get_sort_function(sort_param, data)
if data == "rated":
2021-03-14 14:06:09 +00:00
return render_rated_books(page, book_id, order=order)
elif data == "discover":
return render_discover_books(book_id)
elif data == "unread":
return render_read_books(page, False, order=order)
elif data == "read":
return render_read_books(page, True, order=order)
elif data == "hot":
return render_hot_books(page, order)
elif data == "download":
return render_downloaded_books(page, order, book_id)
elif data == "author":
return render_author_books(page, book_id, order)
elif data == "publisher":
return render_publisher_books(page, book_id, order)
elif data == "series":
return render_series_books(page, book_id, order)
elif data == "ratings":
return render_ratings_books(page, book_id, order)
elif data == "formats":
return render_formats_books(page, book_id, order)
elif data == "category":
return render_category_books(page, book_id, order)
elif data == "language":
return render_language_books(page, book_id, order)
elif data == "archived":
2020-12-23 08:07:49 +00:00
return render_archived_books(page, order)
elif data == "search":
term = request.args.get('query', None)
offset = int(int(config.config_books_per_page) * (page - 1))
return render_search_results(term, offset, order, config.config_books_per_page)
elif data == "advsearch":
term = json.loads(flask_session.get('query', '{}'))
offset = int(int(config.config_books_per_page) * (page - 1))
return render_adv_search_results(term, offset, order, config.config_books_per_page)
2017-11-12 13:06:33 +00:00
else:
website = data or "newest"
entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, True, order[0],
True, config.config_read_column,
db.books_series_link,
db.Books.id == db.books_series_link.c.book,
db.Series)
2017-11-12 13:06:33 +00:00
return render_title_template('index.html', random=random, entries=entries, pagination=pagination,
title=_("Books"), page=website, order=order[1])
2021-03-14 14:06:09 +00:00
def render_rated_books(page, book_id, order):
if current_user.check_visibility(constants.SIDEBAR_BEST_RATED):
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Books.ratings.any(db.Ratings.rating > 9),
order[0],
True, config.config_read_column,
db.books_series_link,
db.Books.id == db.books_series_link.c.book,
db.Series)
2021-03-14 14:06:09 +00:00
return render_title_template('index.html', random=random, entries=entries, pagination=pagination,
id=book_id, title=_("Top Rated Books"), page="rated", order=order[1])
2021-03-14 14:06:09 +00:00
else:
abort(404)
def render_discover_books(book_id):
2021-03-14 14:06:09 +00:00
if current_user.check_visibility(constants.SIDEBAR_RANDOM):
entries, __, ___ = calibre_db.fill_indexpage(1, 0, db.Books, True, [func.randomblob(2)],
join_archive_read=True,
config_read_column=config.config_read_column)
2021-03-14 14:06:09 +00:00
pagination = Pagination(1, config.config_books_per_page, config.config_books_per_page)
return render_title_template('index.html', random=false(), entries=entries, pagination=pagination, id=book_id,
title=_("Discover (Random Books)"), page="discover")
2021-03-14 14:06:09 +00:00
else:
abort(404)
def render_hot_books(page, order):
if current_user.check_visibility(constants.SIDEBAR_HOT):
if order[1] not in ['hotasc', 'hotdesc']:
2022-07-01 13:26:06 +00:00
# Unary expression comparison only working (for this expression) in sqlalchemy 1.4+
# if not (order[0][0].compare(func.count(ub.Downloads.book_id).desc()) or
# order[0][0].compare(func.count(ub.Downloads.book_id).asc())):
order = [func.count(ub.Downloads.book_id).desc()], 'hotdesc'
2017-11-12 13:06:33 +00:00
if current_user.show_detail_random():
random_query = calibre_db.generate_linked_query(config.config_read_column, db.Books)
random = (random_query.filter(calibre_db.common_filters())
.order_by(func.random())
.limit(config.config_random_books).all())
else:
2018-08-28 07:42:19 +00:00
random = false()
2017-11-12 13:06:33 +00:00
off = int(int(config.config_books_per_page) * (page - 1))
all_books = ub.session.query(ub.Downloads, func.count(ub.Downloads.book_id)) \
.order_by(*order[0]).group_by(ub.Downloads.book_id)
2017-11-12 13:06:33 +00:00
hot_books = all_books.offset(off).limit(config.config_books_per_page)
entries = list()
for book in hot_books:
query = calibre_db.generate_linked_query(config.config_read_column, db.Books)
download_book = query.filter(calibre_db.common_filters()).filter(
book.Downloads.book_id == db.Books.id).first()
if download_book:
entries.append(download_book)
2017-11-12 13:06:33 +00:00
else:
ub.delete_download(book.Downloads.book_id)
num_books = entries.__len__()
pagination = Pagination(page, config.config_books_per_page, num_books)
2017-11-12 13:06:33 +00:00
return render_title_template('index.html', random=random, entries=entries, pagination=pagination,
title=_("Hot Books (Most Downloaded)"), page="hot", order=order[1])
2017-11-12 13:06:33 +00:00
else:
abort(404)
def render_downloaded_books(page, order, user_id):
if current_user.role_admin():
user_id = int(user_id)
else:
user_id = current_user.id
user = ub.session.query(ub.User).filter(ub.User.id == user_id).first()
if current_user.check_visibility(constants.SIDEBAR_DOWNLOAD) and user:
entries, random, pagination = calibre_db.fill_indexpage(page,
0,
db.Books,
ub.Downloads.user_id == user_id,
order[0],
True, config.config_read_column,
db.books_series_link,
db.Books.id == db.books_series_link.c.book,
db.Series,
ub.Downloads, db.Books.id == ub.Downloads.book_id)
for book in entries:
if not (calibre_db.session.query(db.Books).filter(calibre_db.common_filters())
.filter(db.Books.id == book.Books.id).first()):
ub.delete_download(book.Books.id)
return render_title_template('index.html',
random=random,
entries=entries,
pagination=pagination,
id=user_id,
title=_("Downloaded books by %(user)s", user=user.name),
page="download",
order=order[1])
else:
abort(404)
def render_author_books(page, author_id, order):
2020-06-06 19:21:10 +00:00
entries, __, pagination = calibre_db.fill_indexpage(page, 0,
2020-05-23 08:16:29 +00:00
db.Books,
db.Books.authors.any(db.Authors.id == author_id),
[order[0][0], db.Series.name, db.Books.series_index],
True, config.config_read_column,
2020-05-23 08:16:29 +00:00
db.books_series_link,
db.books_series_link.c.book == db.Books.id,
2020-05-23 08:16:29 +00:00
db.Series)
if entries is None or not len(entries):
flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"),
category="error")
return redirect(url_for("web.index"))
if sqlalchemy_version2:
author = calibre_db.session.get(db.Authors, author_id)
else:
author = calibre_db.session.query(db.Authors).get(author_id)
author_name = author.name.replace('|', ',')
author_info = None
other_books = []
2019-08-20 17:12:40 +00:00
if services.goodreads_support and config.config_use_goodreads:
author_info = services.goodreads_support.get_author_info(author_name)
book_entries = [entry.Books for entry in entries]
other_books = services.goodreads_support.get_other_books(author_info, book_entries)
return render_title_template('author.html', entries=entries, pagination=pagination, id=author_id,
title=_("Author: %(name)s", name=author_name), author=author_info,
other_books=other_books, page="author", order=order[1])
def render_publisher_books(page, book_id, order):
if book_id == '-1':
2020-06-06 19:21:10 +00:00
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
2020-05-23 08:16:29 +00:00
db.Books,
db.Publishers.name == None,
[db.Series.name, order[0][0], db.Books.series_index],
True, config.config_read_column,
db.books_publishers_link,
db.Books.id == db.books_publishers_link.c.book,
db.Publishers,
2020-05-23 08:16:29 +00:00
db.books_series_link,
2021-03-22 15:04:53 +00:00
db.Books.id == db.books_series_link.c.book,
2020-05-23 08:16:29 +00:00
db.Series)
publisher = _("None")
else:
publisher = calibre_db.session.query(db.Publishers).filter(db.Publishers.id == book_id).first()
if publisher:
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Books.publishers.any(
db.Publishers.id == book_id),
[db.Series.name, order[0][0],
db.Books.series_index],
True, config.config_read_column,
db.books_series_link,
db.Books.id == db.books_series_link.c.book,
db.Series)
publisher = publisher.name
else:
abort(404)
return render_title_template('index.html', random=random, entries=entries, pagination=pagination, id=book_id,
title=_("Publisher: %(name)s", name=publisher),
page="publisher",
order=order[1])
def render_series_books(page, book_id, order):
if book_id == '-1':
2020-06-06 19:21:10 +00:00
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
2020-05-23 08:16:29 +00:00
db.Books,
db.Series.name == None,
[order[0][0]],
True, config.config_read_column,
db.books_series_link,
db.Books.id == db.books_series_link.c.book,
db.Series)
series_name = _("None")
else:
series_name = calibre_db.session.query(db.Series).filter(db.Series.id == book_id).first()
if series_name:
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Books.series.any(db.Series.id == book_id),
[order[0][0]],
True, config.config_read_column)
series_name = series_name.name
else:
abort(404)
return render_title_template('index.html', random=random, pagination=pagination, entries=entries, id=book_id,
title=_("Series: %(serie)s", serie=series_name), page="series", order=order[1])
def render_ratings_books(page, book_id, order):
if book_id == '-1':
db_filter = coalesce(db.Ratings.rating, 0) < 1
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db_filter,
[order[0][0]],
True, config.config_read_column,
db.books_ratings_link,
db.Books.id == db.books_ratings_link.c.book,
db.Ratings)
title = _("Rating: None")
else:
name = calibre_db.session.query(db.Ratings).filter(db.Ratings.id == book_id).first()
if name:
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Books.ratings.any(db.Ratings.id == book_id),
[order[0][0]],
True, config.config_read_column)
title = _("Rating: %(rating)s stars", rating=int(name.rating / 2))
else:
abort(404)
return render_title_template('index.html', random=random, pagination=pagination, entries=entries, id=book_id,
title=title, page="ratings", order=order[1])
def render_formats_books(page, book_id, order):
if book_id == '-1':
name = _("None")
2020-06-06 19:21:10 +00:00
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
2020-05-23 08:16:29 +00:00
db.Books,
db.Data.format == None,
[order[0][0]],
True, config.config_read_column,
db.Data)
else:
name = calibre_db.session.query(db.Data).filter(db.Data.format == book_id.upper()).first()
if name:
name = name.format
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Books.data.any(
db.Data.format == book_id.upper()),
[order[0][0]],
True, config.config_read_column)
else:
abort(404)
return render_title_template('index.html', random=random, pagination=pagination, entries=entries, id=book_id,
title=_("File format: %(format)s", format=name),
page="formats",
order=order[1])
def render_category_books(page, book_id, order):
if book_id == '-1':
2020-06-06 19:21:10 +00:00
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
2020-05-23 08:16:29 +00:00
db.Books,
db.Tags.name == None,
[order[0][0], db.Series.name, db.Books.series_index],
True, config.config_read_column,
db.books_tags_link,
db.Books.id == db.books_tags_link.c.book,
db.Tags,
2021-03-22 15:04:53 +00:00
db.books_series_link,
db.Books.id == db.books_series_link.c.book,
db.Series)
tagsname = _("None")
2017-11-12 13:06:33 +00:00
else:
tagsname = calibre_db.session.query(db.Tags).filter(db.Tags.id == book_id).first()
if tagsname:
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Books.tags.any(db.Tags.id == book_id),
[order[0][0], db.Series.name,
db.Books.series_index],
True, config.config_read_column,
db.books_series_link,
db.Books.id == db.books_series_link.c.book,
db.Series)
tagsname = tagsname.name
else:
abort(404)
return render_title_template('index.html', random=random, entries=entries, pagination=pagination, id=book_id,
title=_("Category: %(name)s", name=tagsname), page="category", order=order[1])
def render_language_books(page, name, order):
try:
if name.lower() != "none":
lang_name = isoLanguages.get_language_name(get_locale(), name)
if lang_name == "Unknown":
abort(404)
else:
lang_name = _("None")
except KeyError:
abort(404)
if name == "none":
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Languages.lang_code == None,
[order[0][0]],
True, config.config_read_column,
db.books_languages_link,
db.Books.id == db.books_languages_link.c.book,
db.Languages)
else:
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db.Books.languages.any(db.Languages.lang_code == name),
[order[0][0]],
True, config.config_read_column)
return render_title_template('index.html', random=random, entries=entries, pagination=pagination, id=name,
title=_("Language: %(name)s", name=lang_name), page="language", order=order[1])
def render_read_books(page, are_read, as_xml=False, order=None):
sort_param = order[0] if order else []
if not config.config_read_column:
if are_read:
db_filter = and_(ub.ReadBook.user_id == int(current_user.id),
ub.ReadBook.read_status == ub.ReadBook.STATUS_FINISHED)
else:
2021-01-02 06:51:48 +00:00
db_filter = coalesce(ub.ReadBook.read_status, 0) != ub.ReadBook.STATUS_FINISHED
else:
try:
if are_read:
db_filter = db.cc_classes[config.config_read_column].value == True
else:
db_filter = coalesce(db.cc_classes[config.config_read_column].value, False) != True
except (KeyError, AttributeError, IndexError):
log.error("Custom Column No.{} does not exist in calibre database".format(config.config_read_column))
if not as_xml:
flash(_("Custom Column No.%(column)d does not exist in calibre database",
column=config.config_read_column),
category="error")
return redirect(url_for("web.index"))
return [] # ToDo: Handle error Case for opds
2022-03-13 09:23:13 +00:00
entries, random, pagination = calibre_db.fill_indexpage(page, 0,
db.Books,
db_filter,
sort_param,
True, config.config_read_column,
db.books_series_link,
db.Books.id == db.books_series_link.c.book,
db.Series)
if as_xml:
return entries, pagination
else:
if are_read:
2023-01-21 14:23:18 +00:00
name = _('Read Books') + ' (' + str(pagination.total_count) + ')'
page_name = "read"
else:
2023-01-21 14:23:18 +00:00
name = _('Unread Books') + ' (' + str(pagination.total_count) + ')'
page_name = "unread"
return render_title_template('index.html', random=random, entries=entries, pagination=pagination,
title=name, page=page_name, order=order[1])
def render_archived_books(page, sort_param):
order = sort_param[0] or []
archived_books = (ub.session.query(ub.ArchivedBook)
.filter(ub.ArchivedBook.user_id == int(current_user.id))
.filter(ub.ArchivedBook.is_archived == True)
.all())
archived_book_ids = [archived_book.book_id for archived_book in archived_books]
archived_filter = db.Books.id.in_(archived_book_ids)
entries, random, pagination = calibre_db.fill_indexpage_with_archived_books(page, db.Books,
0,
archived_filter,
order,
True,
True, config.config_read_column)
name = _('Archived Books') + ' (' + str(len(entries)) + ')'
page_name = "archived"
return render_title_template('index.html', random=random, entries=entries, pagination=pagination,
title=name, page=page_name, order=sort_param[1])
# ################################### View Books list ##################################################################
@web.route("/", defaults={'page': 1})
@web.route('/page/<int:page>')
@login_required_if_no_ano
def index(page):
sort_param = (request.args.get('sort') or 'stored').lower()
return render_books_list("newest", sort_param, 1, page)
@web.route('/<data>/<sort_param>', defaults={'page': 1, 'book_id': 1})
@web.route('/<data>/<sort_param>/', defaults={'page': 1, 'book_id': 1})
@web.route('/<data>/<sort_param>/<book_id>', defaults={'page': 1})
@web.route('/<data>/<sort_param>/<book_id>/<int:page>')
@login_required_if_no_ano
def books_list(data, sort_param, book_id, page):
return render_books_list(data, sort_param, book_id, page)
2020-05-09 08:58:59 +00:00
@web.route("/table")
@user_login_required
2020-05-09 08:58:59 +00:00
def books_table():
2020-07-08 19:18:38 +00:00
visibility = current_user.view_settings.get('table', {})
cc = calibre_db.get_cc_columns(config, filter_config_custom_read=True)
return render_title_template('book_table.html', title=_("Books List"), cc=cc, page="book_table",
2020-06-12 14:15:54 +00:00
visiblility=visibility)
2020-06-05 18:13:39 +00:00
2020-06-05 18:13:39 +00:00
@web.route("/ajax/listbooks")
@user_login_required
2020-06-05 18:13:39 +00:00
def list_books():
off = int(request.args.get("offset") or 0)
limit = int(request.args.get("limit") or config.config_books_per_page)
search_param = request.args.get("search")
sort_param = request.args.get("sort", "id")
2021-04-12 16:39:09 +00:00
order = request.args.get("order", "").lower()
state = None
join = tuple()
2021-04-12 16:39:09 +00:00
if sort_param == "state":
state = json.loads(request.args.get("state", "[]"))
elif sort_param == "tags":
order = [db.Tags.name.asc()] if order == "asc" else [db.Tags.name.desc()]
join = db.books_tags_link, db.Books.id == db.books_tags_link.c.book, db.Tags
elif sort_param == "series":
order = [db.Series.name.asc()] if order == "asc" else [db.Series.name.desc()]
join = db.books_series_link, db.Books.id == db.books_series_link.c.book, db.Series
elif sort_param == "publishers":
order = [db.Publishers.name.asc()] if order == "asc" else [db.Publishers.name.desc()]
join = db.books_publishers_link, db.Books.id == db.books_publishers_link.c.book, db.Publishers
elif sort_param == "authors":
order = [db.Authors.name.asc(), db.Series.name, db.Books.series_index] if order == "asc" \
else [db.Authors.name.desc(), db.Series.name.desc(), db.Books.series_index.desc()]
join = db.books_authors_link, db.Books.id == db.books_authors_link.c.book, db.Authors, db.books_series_link, \
db.Books.id == db.books_series_link.c.book, db.Series
elif sort_param == "languages":
order = [db.Languages.lang_code.asc()] if order == "asc" else [db.Languages.lang_code.desc()]
join = db.books_languages_link, db.Books.id == db.books_languages_link.c.book, db.Languages
elif order and sort_param in ["sort", "title", "authors_sort", "series_index"]:
order = [text(sort_param + " " + order)]
2021-04-12 16:39:09 +00:00
elif not state:
order = [db.Books.timestamp.desc()]
total_count = filtered_count = calibre_db.session.query(db.Books).filter(
calibre_db.common_filters(allow_show_archived=True)).count()
if state is not None:
if search_param:
books = calibre_db.search_query(search_param, config).all()
filtered_count = len(books)
else:
query = calibre_db.generate_linked_query(config.config_read_column, db.Books)
books = query.filter(calibre_db.common_filters(allow_show_archived=True)).all()
entries = calibre_db.get_checkbox_sorted(books, state, off, limit, order, True)
elif search_param:
entries, filtered_count, __ = calibre_db.get_search_results(search_param,
config,
off,
[order, ''],
limit,
*join)
2020-06-06 19:21:10 +00:00
else:
2021-12-05 12:04:13 +00:00
entries, __, __ = calibre_db.fill_indexpage_with_archived_books((int(off) / (int(limit)) + 1),
db.Books,
limit,
True,
order,
True,
True,
config.config_read_column,
*join)
result = list()
2020-06-11 19:19:09 +00:00
for entry in entries:
val = entry[0]
val.is_archived = entry[1] is True
val.read_status = entry[2] == ub.ReadBook.STATUS_FINISHED
for lang_index in range(0, len(val.languages)):
val.languages[lang_index].language_name = isoLanguages.get_language_name(get_locale(), val.languages[
lang_index].lang_code)
result.append(val)
table_entries = {'totalNotFiltered': total_count, 'total': filtered_count, "rows": result}
2020-06-06 19:21:10 +00:00
js_list = json.dumps(table_entries, cls=db.AlchemyEncoder)
2020-06-05 18:13:39 +00:00
response = make_response(js_list)
response.headers["Content-Type"] = "application/json; charset=utf-8"
return response
2020-06-12 14:15:54 +00:00
@web.route("/ajax/table_settings", methods=['POST'])
@user_login_required
2020-06-12 11:45:07 +00:00
def update_table_settings():
2020-07-08 19:18:38 +00:00
current_user.view_settings['table'] = json.loads(request.data)
try:
try:
flag_modified(current_user, "view_settings")
except AttributeError:
pass
2020-07-08 19:18:38 +00:00
ub.session.commit()
except (InvalidRequestError, OperationalError):
2020-07-08 19:18:38 +00:00
log.error("Invalid request received: %r ", request, )
return "Invalid request", 400
2020-06-12 14:15:54 +00:00
return ""
2020-05-09 08:58:59 +00:00
@web.route("/author")
@login_required_if_no_ano
def author_list():
if current_user.check_visibility(constants.SIDEBAR_AUTHOR):
2020-09-28 19:24:47 +00:00
if current_user.get_view_property('author', 'dir') == 'desc':
2020-09-27 10:37:41 +00:00
order = db.Authors.sort.desc()
order_no = 0
2020-09-28 19:24:47 +00:00
else:
order = db.Authors.sort.asc()
order_no = 1
entries = calibre_db.session.query(db.Authors, func.count('books_authors_link.book').label('count')) \
2020-05-23 08:16:29 +00:00
.join(db.books_authors_link).join(db.Books).filter(calibre_db.common_filters()) \
2020-09-27 10:37:41 +00:00
.group_by(text('books_authors_link.author')).order_by(order).all()
char_list = query_char_list(db.Authors.sort, db.books_authors_link)
2021-04-13 17:47:55 +00:00
# If not creating a copy, readonly databases can not display authornames with "|" in it as changing the name
# starts a change session
author_copy = copy.deepcopy(entries)
for entry in author_copy:
entry.Authors.name = entry.Authors.name.replace('|', ',')
return render_title_template('list.html', entries=author_copy, folder='web.books_list', charlist=char_list,
2023-01-21 14:23:18 +00:00
title="Authors", page="authorlist", data='author', order=order_no)
2017-11-12 13:06:33 +00:00
else:
abort(404)
@web.route("/downloadlist")
@login_required_if_no_ano
def download_list():
if current_user.get_view_property('download', 'dir') == 'desc':
order = ub.User.name.desc()
order_no = 0
else:
order = ub.User.name.asc()
order_no = 1
if current_user.check_visibility(constants.SIDEBAR_DOWNLOAD) and current_user.role_admin():
entries = ub.session.query(ub.User, func.count(ub.Downloads.book_id).label('count')) \
.join(ub.Downloads).group_by(ub.Downloads.user_id).order_by(order).all()
char_list = ub.session.query(func.upper(func.substr(ub.User.name, 1, 1)).label('char')) \
.filter(ub.User.role.op('&')(constants.ROLE_ANONYMOUS) != constants.ROLE_ANONYMOUS) \
.group_by(func.upper(func.substr(ub.User.name, 1, 1))).all()
return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=char_list,
title=_("Downloads"), page="downloadlist", data="download", order=order_no)
else:
abort(404)
@web.route("/publisher")
@login_required_if_no_ano
def publisher_list():
2020-09-28 19:24:47 +00:00
if current_user.get_view_property('publisher', 'dir') == 'desc':
2020-09-27 10:37:41 +00:00
order = db.Publishers.name.desc()
order_no = 0
2020-09-28 19:24:47 +00:00
else:
order = db.Publishers.name.asc()
order_no = 1
if current_user.check_visibility(constants.SIDEBAR_PUBLISHER):
entries = calibre_db.session.query(db.Publishers, func.count('books_publishers_link.book').label('count')) \
2020-05-23 08:16:29 +00:00
.join(db.books_publishers_link).join(db.Books).filter(calibre_db.common_filters()) \
2020-09-27 10:37:41 +00:00
.group_by(text('books_publishers_link.publisher')).order_by(order).all()
no_publisher_count = (calibre_db.session.query(db.Books)
.outerjoin(db.books_publishers_link).outerjoin(db.Publishers)
.filter(db.Publishers.name == None)
.filter(calibre_db.common_filters())
.count())
if no_publisher_count:
entries.append([db.Category(_("None"), "-1"), no_publisher_count])
entries = sorted(entries, key=lambda x: x[0].name.lower(), reverse=not order_no)
char_list = generate_char_list(entries)
return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=char_list,
title=_("Publishers"), page="publisherlist", data="publisher", order=order_no)
else:
abort(404)
@web.route("/series")
@login_required_if_no_ano
def series_list():
if current_user.check_visibility(constants.SIDEBAR_SERIES):
2020-09-28 19:24:47 +00:00
if current_user.get_view_property('series', 'dir') == 'desc':
2020-09-27 10:37:41 +00:00
order = db.Series.sort.desc()
order_no = 0
2020-09-28 19:24:47 +00:00
else:
order = db.Series.sort.asc()
order_no = 1
char_list = query_char_list(db.Series.sort, db.books_series_link)
2020-09-27 10:37:41 +00:00
if current_user.get_view_property('series', 'series_view') == 'list':
entries = calibre_db.session.query(db.Series, func.count('books_series_link.book').label('count')) \
2020-05-23 08:16:29 +00:00
.join(db.books_series_link).join(db.Books).filter(calibre_db.common_filters()) \
2020-09-27 10:37:41 +00:00
.group_by(text('books_series_link.series')).order_by(order).all()
no_series_count = (calibre_db.session.query(db.Books)
.outerjoin(db.books_series_link).outerjoin(db.Series)
.filter(db.Series.name == None)
.filter(calibre_db.common_filters())
.count())
if no_series_count:
entries.append([db.Category(_("None"), "-1"), no_series_count])
entries = sorted(entries, key=lambda x: x[0].name.lower(), reverse=not order_no)
return render_title_template('list.html',
entries=entries,
folder='web.books_list',
charlist=char_list,
title=_("Series"),
page="serieslist",
data="series", order=order_no)
else:
entries = (calibre_db.session.query(db.Books, func.count('books_series_link').label('count'),
func.max(db.Books.series_index), db.Books.id)
.join(db.books_series_link).join(db.Series).filter(calibre_db.common_filters())
.group_by(text('books_series_link.series'))
.having(or_(func.max(db.Books.series_index), db.Books.series_index==""))
.order_by(order)
.all())
return render_title_template('grid.html', entries=entries, folder='web.books_list', charlist=char_list,
title=_("Series"), page="serieslist", data="series", bodyClass="grid-view",
order=order_no)
else:
abort(404)
@web.route("/ratings")
2016-04-27 08:35:23 +00:00
@login_required_if_no_ano
def ratings_list():
if current_user.check_visibility(constants.SIDEBAR_RATING):
2020-09-28 19:24:47 +00:00
if current_user.get_view_property('ratings', 'dir') == 'desc':
2020-09-27 10:37:41 +00:00
order = db.Ratings.rating.desc()
order_no = 0
2020-09-28 19:24:47 +00:00
else:
order = db.Ratings.rating.asc()
order_no = 1
entries = calibre_db.session.query(db.Ratings, func.count('books_ratings_link.book').label('count'),
(db.Ratings.rating / 2).label('name')) \
2020-05-23 08:16:29 +00:00
.join(db.books_ratings_link).join(db.Books).filter(calibre_db.common_filters()) \
.filter(db.Ratings.rating > 0) \
2020-09-27 10:37:41 +00:00
.group_by(text('books_ratings_link.rating')).order_by(order).all()
no_rating_count = (calibre_db.session.query(db.Books)
.outerjoin(db.books_ratings_link).outerjoin(db.Ratings)
.filter(or_(db.Ratings.rating == None, db.Ratings.rating == 0))
.filter(calibre_db.common_filters())
.count())
if no_rating_count:
entries.append([db.Category(_("None"), "-1", -1), no_rating_count])
entries = sorted(entries, key=lambda x: x[0].rating, reverse=not order_no)
return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=list(),
title=_("Ratings list"), page="ratingslist", data="ratings", order=order_no)
2017-11-12 13:06:33 +00:00
else:
abort(404)
@web.route("/formats")
2016-04-27 08:35:23 +00:00
@login_required_if_no_ano
def formats_list():
if current_user.check_visibility(constants.SIDEBAR_FORMAT):
if current_user.get_view_property('formats', 'dir') == 'desc':
2020-09-27 10:37:41 +00:00
order = db.Data.format.desc()
order_no = 0
2020-09-28 19:24:47 +00:00
else:
order = db.Data.format.asc()
order_no = 1
2020-05-23 08:16:29 +00:00
entries = calibre_db.session.query(db.Data,
func.count('data.book').label('count'),
db.Data.format.label('format')) \
.join(db.Books).filter(calibre_db.common_filters()) \
2020-09-27 10:37:41 +00:00
.group_by(db.Data.format).order_by(order).all()
no_format_count = (calibre_db.session.query(db.Books).outerjoin(db.Data)
.filter(db.Data.format == None)
.filter(calibre_db.common_filters())
.count())
if no_format_count:
entries.append([db.Category(_("None"), "-1"), no_format_count])
return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=list(),
title=_("File formats list"), page="formatslist", data="formats", order=order_no)
else:
abort(404)
@web.route("/language")
@login_required_if_no_ano
def language_overview():
2023-01-21 14:23:18 +00:00
if current_user.check_visibility(constants.SIDEBAR_LANGUAGE) and current_user.filter_language() == "all":
order_no = 0 if current_user.get_view_property('language', 'dir') == 'desc' else 1
languages = calibre_db.speaking_language(reverse_order=not order_no, with_count=True)
char_list = generate_char_list(languages)
return render_title_template('list.html', entries=languages, folder='web.books_list', charlist=char_list,
title=_("Languages"), page="langlist", data="language", order=order_no)
else:
2017-11-12 13:06:33 +00:00
abort(404)
@web.route("/category")
@login_required_if_no_ano
def category_list():
if current_user.check_visibility(constants.SIDEBAR_CATEGORY):
2020-09-28 19:24:47 +00:00
if current_user.get_view_property('category', 'dir') == 'desc':
2020-09-27 10:37:41 +00:00
order = db.Tags.name.desc()
order_no = 0
2020-09-28 19:24:47 +00:00
else:
order = db.Tags.name.asc()
order_no = 1
entries = calibre_db.session.query(db.Tags, func.count('books_tags_link.book').label('count')) \
2020-09-27 10:37:41 +00:00
.join(db.books_tags_link).join(db.Books).order_by(order).filter(calibre_db.common_filters()) \
2022-08-29 17:08:04 +00:00
.group_by(db.Tags.id).all()
no_tag_count = (calibre_db.session.query(db.Books)
.outerjoin(db.books_tags_link).outerjoin(db.Tags)
.filter(db.Tags.name == None)
.filter(calibre_db.common_filters())
.count())
if no_tag_count:
entries.append([db.Category(_("None"), "-1"), no_tag_count])
entries = sorted(entries, key=lambda x: x[0].name.lower(), reverse=not order_no)
char_list = generate_char_list(entries)
return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=char_list,
title=_("Categories"), page="catlist", data="category", order=order_no)
2017-11-12 13:06:33 +00:00
else:
abort(404)
# ################################### Download/Send ##################################################################
2017-04-14 18:29:11 +00:00
@web.route("/cover/<int:book_id>")
@web.route("/cover/<int:book_id>/<string:resolution>")
@login_required_if_no_ano
def get_cover(book_id, resolution=None):
resolutions = {
'og': constants.COVER_THUMBNAIL_ORIGINAL,
'sm': constants.COVER_THUMBNAIL_SMALL,
'md': constants.COVER_THUMBNAIL_MEDIUM,
'lg': constants.COVER_THUMBNAIL_LARGE,
}
cover_resolution = resolutions.get(resolution, None)
return get_book_cover(book_id, cover_resolution)
@web.route("/series_cover/<int:series_id>")
@web.route("/series_cover/<int:series_id>/<string:resolution>")
@login_required_if_no_ano
def get_series_cover(series_id, resolution=None):
resolutions = {
'og': constants.COVER_THUMBNAIL_ORIGINAL,
'sm': constants.COVER_THUMBNAIL_SMALL,
'md': constants.COVER_THUMBNAIL_MEDIUM,
'lg': constants.COVER_THUMBNAIL_LARGE,
}
cover_resolution = resolutions.get(resolution, None)
return get_series_cover_thumbnail(series_id, cover_resolution)
2020-07-04 19:22:19 +00:00
@web.route("/robots.txt")
def get_robots():
try:
return send_from_directory(constants.STATIC_DIR, "robots.txt")
except PermissionError:
log.error("No permission to access robots.txt file.")
abort(403)
2017-04-14 18:29:11 +00:00
@web.route("/show/<int:book_id>/<book_format>", defaults={'anyname': 'None'})
@web.route("/show/<int:book_id>/<book_format>/<anyname>")
@login_required_if_no_ano
@viewer_required
def serve_book(book_id, book_format, anyname):
book_format = book_format.split(".")[0]
2020-05-23 08:16:29 +00:00
book = calibre_db.get_book(book_id)
data = calibre_db.get_book_format(book_id, book_format.upper())
if not data:
return "File not in Database"
2023-02-03 15:31:49 +00:00
range_header = request.headers.get('Range', None)
if not range_header:
log.info('Serving book: \'%s\' to %s - %s', data.name, current_user.name,
request.headers.get('X-Forwarded-For', request.remote_addr))
if config.config_use_google_drive:
try:
headers = Headers()
headers["Content-Type"] = mimetypes.types_map.get('.' + book_format, "application/octet-stream")
if not range_header:
2023-02-22 17:59:11 +00:00
headers['Accept-Ranges'] = 'bytes'
df = getFileFromEbooksFolder(book.path, data.name + "." + book_format)
return do_gdrive_download(df, headers, (book_format.upper() == 'TXT'))
except AttributeError as ex:
log.error_or_exception(ex)
return "File Not Found"
else:
if book_format.upper() == 'TXT':
try:
rawdata = open(os.path.join(config.get_book_path(), book.path, data.name + "." + book_format),
"rb").read()
result = chardet.detect(rawdata)
try:
text_data = rawdata.decode(result['encoding']).encode('utf-8')
except UnicodeDecodeError as e:
log.error("Encoding error in text file {}: {}".format(book.id, e))
if "surrogate" in e.reason:
text_data = rawdata.decode(result['encoding'], 'surrogatepass').encode('utf-8', 'surrogatepass')
else:
text_data = rawdata.decode(result['encoding'], 'ignore').encode('utf-8', 'ignore')
return make_response(text_data)
except FileNotFoundError:
log.error("File Not Found")
return "File Not Found"
2023-02-22 17:59:11 +00:00
# enable byte range read of pdf
response = make_response(
send_from_directory(os.path.join(config.get_book_path(), book.path), data.name + "." + book_format))
2023-02-22 17:59:11 +00:00
if not range_header:
response.headers['Accept-Ranges'] = 'bytes'
return response
2020-04-01 16:45:16 +00:00
@web.route("/download/<int:book_id>/<book_format>", defaults={'anyname': 'None'})
@web.route("/download/<int:book_id>/<book_format>/<anyname>")
@login_required_if_no_ano
2016-04-27 08:35:23 +00:00
@download_required
2020-04-01 16:45:16 +00:00
def download_link(book_id, book_format, anyname):
client = "kobo" if "Kobo" in request.headers.get('User-Agent') else ""
return get_download_link(book_id, book_format, client)
@web.route('/send/<int:book_id>/<book_format>/<int:convert>', methods=["POST"])
@login_required_if_no_ano
@download_required
def send_to_ereader(book_id, book_format, convert):
2019-12-28 15:18:21 +00:00
if not config.get_mail_server_configured():
return make_response(jsonify(type="danger", message=_("Please configure the SMTP mail settings first...")))
elif current_user.kindle_mail:
result = send_mail(book_id, book_format, convert, current_user.kindle_mail, config.get_book_path(),
current_user.name)
if result is None:
ub.update_download(book_id, int(current_user.id))
response = [{'type': "success", 'message': _("Success! Book queued for sending to %(eReadermail)s",
eReadermail=current_user.kindle_mail)}]
else:
response = [{'type': "danger", 'message': _("Oops! There was an error sending book: %(res)s", res=result)}]
else:
response = [{'type': "danger", 'message': _("Oops! Please update your profile with a valid eReader Email.")}]
return make_response(jsonify(response))
2017-04-14 18:29:11 +00:00
# ################################### Login Logout ##################################################################
2023-02-12 12:10:00 +00:00
@web.route('/register', methods=['POST'])
@limiter.limit("40/day", key_func=get_remote_address)
@limiter.limit("3/minute", key_func=get_remote_address)
def register_post():
if not config.config_public_reg:
2015-10-13 00:30:55 +00:00
abort(404)
2023-02-12 12:10:00 +00:00
to_save = request.form.to_dict()
try:
limiter.check()
except RateLimitExceeded:
flash(_(u"Please wait one minute to register next user"), category="error")
return render_title_template('register.html', config=config, title=_("Register"), page="register")
2024-02-25 05:59:25 +00:00
except (ConnectionError, Exception) as e:
log.error("Connection error to limiter backend: %s", e)
flash(_("Connection error to limiter backend, please contact your administrator"), category="error")
return render_title_template('register.html', config=config, title=_("Register"), page="register")
if current_user is not None and current_user.is_authenticated:
return redirect(url_for('web.index'))
2019-12-28 15:18:21 +00:00
if not config.get_mail_server_configured():
flash(_("Oops! Email server is not configured, please contact your administrator."), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
nickname = strip_whitespaces(to_save.get("email", "")) if config.config_register_email else to_save.get('name')
2023-02-12 12:10:00 +00:00
if not nickname or not to_save.get("email"):
flash(_("Oops! Please complete all fields."), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
try:
nickname = check_username(nickname)
email = check_email(to_save.get("email", ""))
except Exception as ex:
flash(str(ex), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
2015-10-13 00:30:55 +00:00
2023-02-12 12:10:00 +00:00
content = ub.User()
if check_valid_domain(email):
content.name = nickname
content.email = email
2023-02-15 18:53:35 +00:00
password = generate_random_password(config.config_password_min_length)
content.password = generate_password_hash(password)
2023-02-12 12:10:00 +00:00
content.role = config.config_default_role
content.locale = config.config_default_locale
content.sidebar_view = config.config_default_show
try:
2023-02-12 12:10:00 +00:00
ub.session.add(content)
ub.session.commit()
if feature_support['oauth']:
register_user_with_oauth(content)
send_registration_mail(strip_whitespaces(to_save.get("email", "")), nickname, password)
2023-02-12 12:10:00 +00:00
except Exception:
ub.session.rollback()
flash(_("Oops! An unknown error occurred. Please try again later."), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
2023-02-12 12:10:00 +00:00
else:
flash(_("Oops! Your Email is not allowed."), category="error")
log.warning('Registering failed for user "{}" Email: {}'.format(nickname, to_save.get("email","")))
return render_title_template('register.html', title=_("Register"), page="register")
flash(_("Success! Confirmation Email has been sent."), category="success")
return redirect(url_for('web.login'))
2020-05-12 15:23:58 +00:00
2015-10-13 00:30:55 +00:00
2023-02-12 12:10:00 +00:00
@web.route('/register', methods=['GET'])
def register():
if not config.config_public_reg:
abort(404)
if current_user is not None and current_user.is_authenticated:
return redirect(url_for('web.index'))
if not config.get_mail_server_configured():
flash(_("Oops! Email server is not configured, please contact your administrator."), category="error")
return render_title_template('register.html', title=_("Register"), page="register")
if feature_support['oauth']:
register_user_with_oauth()
return render_title_template('register.html', config=config, title=_("Register"), page="register")
2015-10-13 00:30:55 +00:00
2022-07-02 17:12:18 +00:00
def handle_login_user(user, remember, message, category):
login_user(user, remember=remember)
flash(message, category=category)
[limiter.limiter.storage.clear(k.key) for k in limiter.current_limits]
return redirect(get_redirect_location(request.form.get('next', None), "web.index"))
2022-07-02 17:12:18 +00:00
def render_login(username="", password=""):
next_url = request.args.get('next', default=url_for("web.index"), type=str)
if url_for("web.logout") == next_url:
next_url = url_for("web.index")
return render_title_template('login.html',
title=_("Login"),
next_url=next_url,
config=config,
username=username,
password=password,
oauth_check=oauth_check,
mail=config.get_mail_server_configured(), page="login")
@web.route('/login', methods=['GET'])
def login():
if current_user is not None and current_user.is_authenticated:
return redirect(url_for('web.index'))
if config.config_login_type == constants.LOGIN_LDAP and not services.ldap:
2019-12-28 15:18:21 +00:00
log.error(u"Cannot activate LDAP authentication")
flash(_(u"Cannot activate LDAP authentication"), category="error")
return render_login()
@web.route('/login', methods=['POST'])
@limiter.limit("40/day", key_func=lambda: strip_whitespaces(request.form.get('username', "")).lower())
@limiter.limit("3/minute", key_func=lambda: strip_whitespaces(request.form.get('username', "")).lower())
def login_post():
form = request.form.to_dict()
username = strip_whitespaces(form.get('username', "")).lower().replace("\n","").replace("\r","")
try:
limiter.check()
except RateLimitExceeded:
2024-02-25 05:59:25 +00:00
flash(_("Please wait one minute before next login"), category="error")
return render_login(username, form.get("password", ""))
except (ConnectionError, Exception) as e:
log.error("Connection error to limiter backend: %s", e)
flash(_("Connection error to limiter backend, please contact your administrator"), category="error")
2024-01-06 15:07:43 +00:00
return render_login(username, form.get("password", ""))
if current_user is not None and current_user.is_authenticated:
return redirect(url_for('web.index'))
if config.config_login_type == constants.LOGIN_LDAP and not services.ldap:
log.error(u"Cannot activate LDAP authentication")
flash(_(u"Cannot activate LDAP authentication"), category="error")
2024-01-06 15:07:43 +00:00
user = ub.session.query(ub.User).filter(func.lower(ub.User.name) == username).first()
remember_me = bool(form.get('remember_me'))
if config.config_login_type == constants.LOGIN_LDAP and services.ldap and user and form['password'] != "":
2024-01-06 15:07:43 +00:00
login_result, error = services.ldap.bind_user(username, form['password'])
if login_result:
log.debug(u"You are now logged in as: '{}'".format(user.name))
return handle_login_user(user,
remember_me,
_(u"you are now logged in as: '%(nickname)s'", nickname=user.name),
"success")
elif login_result is None and user and check_password_hash(str(user.password), form['password']) \
and user.name != "Guest":
log.info("Local Fallback Login as: '{}'".format(user.name))
return handle_login_user(user,
remember_me,
_(u"Fallback Login as: '%(nickname)s', "
u"LDAP Server not reachable, or user not known", nickname=user.name),
"warning")
elif login_result is None:
log.info(error)
flash(_(u"Could not login: %(message)s", message=error), category="error")
else:
ip_address = request.headers.get('X-Forwarded-For', request.remote_addr)
2024-01-06 15:07:43 +00:00
log.warning('LDAP Login failed for user "%s" IP-address: %s', username, ip_address)
flash(_(u"Wrong Username or Password"), category="error")
else:
ip_address = request.headers.get('X-Forwarded-For', request.remote_addr)
if form.get('forgot', "") == 'forgot':
if user is not None and user.name != "Guest":
ret, __ = reset_password(user.id)
if ret == 1:
2024-02-10 09:07:10 +00:00
flash(_(u"New Password was sent to your email address"), category="info")
2024-01-06 15:07:43 +00:00
log.info('Password reset for user "%s" IP-address: %s', username, ip_address)
else:
log.error(u"An unknown error occurred. Please try again later")
flash(_(u"An unknown error occurred. Please try again later."), category="error")
else:
flash(_(u"Please enter valid username to reset password"), category="error")
log.warning('Username missing for password reset IP-address: %s', ip_address)
else:
if user and check_password_hash(str(user.password), form['password']) and user.name != "Guest":
config.config_is_initial = False
2022-04-22 07:06:37 +00:00
log.debug(u"You are now logged in as: '{}'".format(user.name))
2022-07-02 17:12:18 +00:00
return handle_login_user(user,
remember_me,
_(u"You are now logged in as: '%(nickname)s'", nickname=user.name),
2022-07-02 17:12:18 +00:00
"success")
else:
2024-01-06 15:07:43 +00:00
log.warning('Login failed for user "{}" IP-address: {}'.format(username, ip_address))
flash(_(u"Wrong Username or Password"), category="error")
2024-01-06 15:07:43 +00:00
return render_login(username, form.get("password", ""))
@web.route('/logout')
@user_login_required
def logout():
if current_user is not None and current_user.is_authenticated:
ub.delete_user_session(current_user.id, flask_session.get('_id', ""))
logout_user()
if feature_support['oauth'] and (config.config_login_type == 2 or config.config_login_type == 3):
logout_oauth_user()
log.debug("User logged out")
if config.config_anonbrowse:
location = get_redirect_location(request.args.get('next', None), "web.login")
else:
location = None
if location:
return redirect(location)
else:
return redirect(url_for('web.login'))
# ################################### Users own configuration #########################################################
2021-07-23 18:12:37 +00:00
def change_profile(kobo_support, local_oauth_check, oauth_status, translations, languages):
2021-03-14 14:06:09 +00:00
to_save = request.form.to_dict()
current_user.random_books = 0
try:
2023-01-28 17:59:14 +00:00
if current_user.role_passwd() or current_user.role_admin():
if to_save.get("password", "") != "":
current_user.password = generate_password_hash(valid_password(to_save.get("password")))
2023-02-19 18:36:52 +00:00
if to_save.get("kindle_mail", current_user.kindle_mail) != current_user.kindle_mail:
current_user.kindle_mail = valid_email(to_save.get("kindle_mail"))
2022-08-28 13:59:25 +00:00
new_email = valid_email(to_save.get("email", current_user.email))
if not new_email:
raise Exception(_("Email can't be empty and has to be a valid Email"))
if new_email != current_user.email:
2022-08-28 13:59:25 +00:00
current_user.email = check_email(new_email)
2021-07-23 17:34:46 +00:00
if current_user.role_admin():
if to_save.get("name", current_user.name) != current_user.name:
# Query username, if not existing, change
2022-04-22 07:06:37 +00:00
current_user.name = check_username(to_save.get("name"))
current_user.random_books = 1 if to_save.get("show_random") == "on" else 0
2022-04-22 07:06:37 +00:00
current_user.default_language = to_save.get("default_language", "all")
current_user.locale = to_save.get("locale", "en")
2021-10-10 08:23:58 +00:00
old_state = current_user.kobo_only_shelves_sync
# 1 -> 0: nothing has to be done
# 0 -> 1: all synced books have to be added to archived books, + currently synced shelfs which
# don't have to be synced have to be removed (added to Shelf archive)
current_user.kobo_only_shelves_sync = int(to_save.get("kobo_only_shelves_sync") == "on") or 0
2021-10-10 08:23:58 +00:00
if old_state == 0 and current_user.kobo_only_shelves_sync == 1:
kobo_sync_status.update_on_sync_shelfs(current_user.id)
except Exception as ex:
flash(str(ex), category="error")
2021-07-23 18:12:37 +00:00
return render_title_template("user_edit.html",
content=current_user,
2022-06-16 08:44:42 +00:00
config=config,
2021-07-23 18:12:37 +00:00
translations=translations,
profile=1,
languages=languages,
title=_("%(name)s's Profile", name=current_user.name),
2021-07-23 18:12:37 +00:00
page="me",
kobo_support=kobo_support,
2021-07-23 18:12:37 +00:00
registered_oauth=local_oauth_check,
oauth_status=oauth_status)
2021-03-14 12:28:52 +00:00
val = 0
for key, __ in to_save.items():
if key.startswith('show'):
val += int(key[5:])
current_user.sidebar_view = val
if to_save.get("Show_detail_random"):
2021-03-14 12:28:52 +00:00
current_user.sidebar_view += constants.DETAIL_RANDOM
try:
ub.session.commit()
flash(_("Success! Profile Updated"), category="success")
2023-01-21 14:27:51 +00:00
log.debug("Profile updated")
2021-03-14 12:28:52 +00:00
except IntegrityError:
ub.session.rollback()
flash(_("Oops! An account already exists for this Email."), category="error")
log.debug("Found an existing account for this Email")
2021-03-14 12:28:52 +00:00
except OperationalError as e:
ub.session.rollback()
log.error("Database error: %s", e)
flash(_("Oops! Database Error: %(error)s.", error=e), category="error")
@web.route("/me", methods=["GET", "POST"])
@user_login_required
def profile():
2020-05-23 08:16:29 +00:00
languages = calibre_db.speaking_language()
translations = get_available_locale()
2020-01-27 19:32:37 +00:00
kobo_support = feature_support['kobo'] and config.config_kobo_sync
if feature_support['oauth'] and config.config_login_type == 2:
oauth_status = get_oauth_status()
2020-08-26 16:22:56 +00:00
local_oauth_check = oauth_check
else:
oauth_status = None
2020-08-26 16:22:56 +00:00
local_oauth_check = {}
2020-01-27 19:32:37 +00:00
if request.method == "POST":
2021-07-23 18:12:37 +00:00
change_profile(kobo_support, local_oauth_check, oauth_status, translations, languages)
return render_title_template("user_edit.html",
translations=translations,
profile=1,
languages=languages,
content=current_user,
2022-06-16 08:44:42 +00:00
config=config,
kobo_support=kobo_support,
title=_("%(name)s's Profile", name=current_user.name),
page="me",
registered_oauth=local_oauth_check,
oauth_status=oauth_status)
# ###################################Show single book ##################################################################
@web.route("/read/<int:book_id>/<book_format>")
@login_required_if_no_ano
@viewer_required
def read_book(book_id, book_format):
2020-05-23 08:16:29 +00:00
book = calibre_db.get_filtered_book(book_id)
2022-03-13 09:23:13 +00:00
if not book:
flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"),
category="error")
log.debug("Selected book is unavailable. File does not exist or is not accessible")
return redirect(url_for("web.index"))
book.ordered_authors = calibre_db.order_authors([book], False)
2022-03-13 09:23:13 +00:00
# check if book has a bookmark
bookmark = None
if current_user.is_authenticated:
bookmark = ub.session.query(ub.Bookmark).filter(and_(ub.Bookmark.user_id == int(current_user.id),
ub.Bookmark.book_id == book_id,
ub.Bookmark.format == book_format.upper())).first()
if book_format.lower() == "epub":
log.debug("Start epub reader for %d", book_id)
return render_title_template('read.html', bookid=book_id, title=book.title, bookmark=bookmark)
elif book_format.lower() == "pdf":
log.debug("Start pdf reader for %d", book_id)
return render_title_template('readpdf.html', pdffile=book_id, title=book.title)
elif book_format.lower() == "txt":
log.debug("Start txt reader for %d", book_id)
return render_title_template('readtxt.html', txtfile=book_id, title=book.title)
2023-04-15 13:25:46 +00:00
elif book_format.lower() in ["djvu", "djv"]:
log.debug("Start djvu reader for %d", book_id)
return render_title_template('readdjvu.html', djvufile=book_id, title=book.title,
extension=book_format.lower())
else:
for fileExt in constants.EXTENSIONS_AUDIO:
if book_format.lower() == fileExt:
2020-05-23 08:16:29 +00:00
entries = calibre_db.get_filtered_book(book_id)
log.debug("Start mp3 listening for %d", book_id)
return render_title_template('listenmp3.html', mp3file=book_id, audioformat=book_format.lower(),
entry=entries, bookmark=bookmark)
for fileExt in ["cbr", "cbt", "cbz"]:
if book_format.lower() == fileExt:
all_name = str(book_id)
title = book.title
if len(book.series):
title = title + " - " + book.series[0].name
if book.series_index:
title = title + " #" + '{0:.2f}'.format(book.series_index).rstrip('0').rstrip('.')
log.debug("Start comic reader for %d", book_id)
return render_title_template('readcbr.html', comicfile=all_name, title=title,
2023-08-26 18:12:34 +00:00
extension=fileExt, bookmark=bookmark)
log.debug("Selected book is unavailable. File does not exist or is not accessible")
flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"),
category="error")
return redirect(url_for("web.index"))
@web.route("/book/<int:book_id>")
@login_required_if_no_ano
def show_book(book_id):
2021-10-24 07:48:29 +00:00
entries = calibre_db.get_book_read_archived(book_id, config.config_read_column, allow_show_archived=True)
if entries:
2021-10-24 07:48:29 +00:00
read_book = entries[1]
archived_book = entries[2]
entry = entries[0]
entry.read_status = read_book == ub.ReadBook.STATUS_FINISHED
entry.is_archived = archived_book
for lang_index in range(0, len(entry.languages)):
entry.languages[lang_index].language_name = isoLanguages.get_language_name(get_locale(), entry.languages[
lang_index].lang_code)
cc = calibre_db.get_cc_columns(config, filter_config_custom_read=True)
book_in_shelves = []
shelves = ub.session.query(ub.BookShelf).filter(ub.BookShelf.book_id == book_id).all()
for sh in shelves:
book_in_shelves.append(sh.shelf)
2019-02-27 18:30:13 +00:00
2021-10-24 07:48:29 +00:00
entry.tags = sort(entry.tags, key=lambda tag: tag.name)
entry.ordered_authors = calibre_db.order_authors([entry])
entry.email_share_list = check_send_to_ereader(entry)
2021-10-24 07:48:29 +00:00
entry.reader_list = check_read_formats(entry)
2017-03-13 00:44:20 +00:00
entry.audio_entries = []
2021-10-24 07:48:29 +00:00
for media_format in entry.data:
if media_format.format.lower() in constants.EXTENSIONS_AUDIO:
entry.audio_entries.append(media_format.format.lower())
return render_title_template('detail.html',
2021-10-24 07:48:29 +00:00
entry=entry,
cc=cc,
is_xhr=request.headers.get('X-Requested-With') == 'XMLHttpRequest',
2021-10-24 07:48:29 +00:00
title=entry.title,
books_shelfs=book_in_shelves,
page="book")
else:
log.debug("Selected book is unavailable. File does not exist or is not accessible")
flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"),
category="error")
return redirect(url_for("web.index"))