# This file is part of the Calibre-Web (https://github.com/janeczku/calibre-web) # Copyright (C) 2018-2019 OzzieIsaacs, cervinko, jkrehm, bodybybuddha, ok11, # andy29485, idalin, Kyosfonica, wuqi, Kennyl, lemmsh, # falgh1, grunjol, csitko, ytils, xybydy, trasba, vrabe, # ruben-herold, marblepebble, JackED42, SiphonSquirrel, # apetresc, nanu-c, mutschler # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import json import mimetypes import chardet # dependency of requests import copy from importlib.metadata import metadata from flask import Blueprint, jsonify, request, redirect, send_from_directory, make_response, flash, abort, url_for from flask import session as flask_session from flask_babel import gettext as _ from flask_babel import get_locale from .cw_login import login_user, logout_user, current_user from flask_limiter import RateLimitExceeded from flask_limiter.util import get_remote_address from sqlalchemy.exc import IntegrityError, InvalidRequestError, OperationalError from sqlalchemy.sql.expression import text, func, false, not_, and_, or_ from sqlalchemy.orm.attributes import flag_modified from sqlalchemy.sql.functions import coalesce from werkzeug.datastructures import Headers from werkzeug.security import generate_password_hash, check_password_hash from . import constants, logger, isoLanguages, services from . import db, ub, config, app from . import calibre_db, kobo_sync_status from .search import render_search_results, render_adv_search_results from .gdriveutils import getFileFromEbooksFolder, do_gdrive_download from .helper import check_valid_domain, check_email, check_username, \ get_book_cover, get_series_cover_thumbnail, get_download_link, send_mail, generate_random_password, \ send_registration_mail, check_send_to_ereader, check_read_formats, tags_filters, reset_password, valid_email, \ edit_book_read_status, valid_password from .pagination import Pagination from .redirect import get_redirect_location from .cw_babel import get_available_locale from .usermanagement import login_required_if_no_ano from .kobo_sync_status import remove_synced_book from .render_template import render_title_template from .kobo_sync_status import change_archived_books from . import limiter from .services.worker import WorkerThread from .tasks_status import render_task_status from .usermanagement import user_login_required from .string_helper import strip_whitespaces feature_support = { 'ldap': bool(services.ldap), 'goodreads': bool(services.goodreads_support), 'kobo': bool(services.kobo) } try: from .oauth_bb import oauth_check, register_user_with_oauth, logout_oauth_user, get_oauth_status feature_support['oauth'] = True except ImportError: feature_support['oauth'] = False oauth_check = {} register_user_with_oauth = logout_oauth_user = get_oauth_status = None from functools import wraps try: from natsort import natsorted as sort except ImportError: sort = sorted # Just use regular sort then, may cause issues with badly named pages in cbz/cbr files sql_version = metadata("sqlalchemy")["Version"] sqlalchemy_version2 = ([int(x) if x.isnumeric() else 0 for x in sql_version.split('.')[:3]] >= [2, 0, 0]) @app.after_request def add_security_headers(resp): default_src = ([host.strip() for host in config.config_trustedhosts.split(',') if host] + ["'self'", "'unsafe-inline'", "'unsafe-eval'"]) csp = "default-src " + ' '.join(default_src) if request.endpoint == "web.read_book" and config.config_use_google_drive: csp +=" blob: " csp += "; font-src 'self' data:" if request.endpoint == "web.read_book": csp += " blob: " csp += "; img-src 'self'" if request.path.startswith("/author/") and config.config_use_goodreads: csp += " images.gr-assets.com i.gr-assets.com s.gr-assets.com" csp += " data:" if request.endpoint == "edit-book.show_edit_book" or config.config_use_google_drive: csp += " *" if request.endpoint == "web.read_book": csp += " blob: ; style-src-elem 'self' blob: 'unsafe-inline'" csp += "; object-src 'none';" resp.headers['Content-Security-Policy'] = csp resp.headers['X-Content-Type-Options'] = 'nosniff' resp.headers['X-Frame-Options'] = 'SAMEORIGIN' resp.headers['X-XSS-Protection'] = '1; mode=block' resp.headers['Strict-Transport-Security'] = 'max-age=31536000'; return resp web = Blueprint('web', __name__) log = logger.create() # ################################### Login logic and rights management ############################################### def download_required(f): @wraps(f) def inner(*args, **kwargs): if current_user.role_download(): return f(*args, **kwargs) abort(403) return inner def viewer_required(f): @wraps(f) def inner(*args, **kwargs): if current_user.role_viewer(): return f(*args, **kwargs) abort(403) return inner # ################################### data provider functions ######################################################### @web.route("/ajax/emailstat") @user_login_required def get_email_status_json(): tasks = WorkerThread.get_instance().tasks return jsonify(render_task_status(tasks)) @web.route("/ajax/bookmark//", methods=['POST']) @user_login_required def set_bookmark(book_id, book_format): bookmark_key = request.form["bookmark"] ub.session.query(ub.Bookmark).filter(and_(ub.Bookmark.user_id == int(current_user.id), ub.Bookmark.book_id == book_id, ub.Bookmark.format == book_format)).delete() if not bookmark_key: ub.session_commit() return "", 204 l_bookmark = ub.Bookmark(user_id=current_user.id, book_id=book_id, format=book_format, bookmark_key=bookmark_key) ub.session.merge(l_bookmark) ub.session_commit("Bookmark for user {} in book {} created".format(current_user.id, book_id)) return "", 201 @web.route("/ajax/toggleread/", methods=['POST']) @user_login_required def toggle_read(book_id): message = edit_book_read_status(book_id) if message: return message, 400 else: return message @web.route("/ajax/togglearchived/", methods=['POST']) @user_login_required def toggle_archived(book_id): change_archived_books(book_id, message="Book {} archive bit toggled".format(book_id)) # Remove book from syncd books list to force resync (?) remove_synced_book(book_id) return "" @web.route("/ajax/view", methods=["POST"]) @login_required_if_no_ano def update_view(): to_save = request.get_json() try: for element in to_save: for param in to_save[element]: current_user.set_view_property(element, param, to_save[element][param]) except Exception as ex: log.error("Could not save view_settings: %r %r: %e", request, to_save, ex) return "Invalid request", 400 return "1", 200 ''' @web.route("/ajax/getcomic///") @user_login_required def get_comic_book(book_id, book_format, page): book = calibre_db.get_book(book_id) if not book: return "", 204 else: for bookformat in book.data: if bookformat.format.lower() == book_format.lower(): cbr_file = os.path.join(config.config_calibre_dir, book.path, bookformat.name) + "." + book_format if book_format in ("cbr", "rar"): if feature_support['rar'] == True: rarfile.UNRAR_TOOL = config.config_rarfile_location try: rf = rarfile.RarFile(cbr_file) names = sort(rf.namelist()) extract = lambda page: rf.read(names[page]) except: # rarfile not valid log.error('Unrar binary not found, or unable to decompress file %s', cbr_file) return "", 204 else: log.info('Unrar is not supported please install python rarfile extension') # no support means return nothing return "", 204 elif book_format in ("cbz", "zip"): zf = zipfile.ZipFile(cbr_file) names=sort(zf.namelist()) extract = lambda page: zf.read(names[page]) elif book_format in ("cbt", "tar"): tf = tarfile.TarFile(cbr_file) names=sort(tf.getnames()) extract = lambda page: tf.extractfile(names[page]).read() else: log.error('unsupported comic format') return "", 204 b64 = codecs.encode(extract(page), 'base64').decode() ext = names[page].rpartition('.')[-1] if ext not in ('png', 'gif', 'jpg', 'jpeg', 'webp'): ext = 'png' extractedfile="data:image/" + ext + ";base64," + b64 fileData={"name": names[page], "page":page, "last":len(names)-1, "content": extractedfile} return make_response(json.dumps(fileData)) return "", 204 ''' # ################################### Typeahead ################################################################## @web.route("/get_authors_json", methods=['GET']) @login_required_if_no_ano def get_authors_json(): return calibre_db.get_typeahead(db.Authors, request.args.get('q'), ('|', ',')) @web.route("/get_publishers_json", methods=['GET']) @login_required_if_no_ano def get_publishers_json(): return calibre_db.get_typeahead(db.Publishers, request.args.get('q'), ('|', ',')) @web.route("/get_tags_json", methods=['GET']) @login_required_if_no_ano def get_tags_json(): return calibre_db.get_typeahead(db.Tags, request.args.get('q'), tag_filter=tags_filters()) @web.route("/get_series_json", methods=['GET']) @login_required_if_no_ano def get_series_json(): return calibre_db.get_typeahead(db.Series, request.args.get('q')) @web.route("/get_languages_json", methods=['GET']) @login_required_if_no_ano def get_languages_json(): query = (request.args.get('q') or '').lower() language_names = isoLanguages.get_language_names(get_locale()) entries_start = [s for key, s in language_names.items() if s.lower().startswith(query.lower())] if len(entries_start) < 5: entries = [s for key, s in language_names.items() if query in s.lower()] entries_start.extend(entries[0:(5 - len(entries_start))]) entries_start = list(set(entries_start)) json_dumps = json.dumps([dict(name=r) for r in entries_start[0:5]]) return json_dumps @web.route("/get_matching_tags", methods=['GET']) @login_required_if_no_ano def get_matching_tags(): tag_dict = {'tags': []} q = calibre_db.session.query(db.Books).filter(calibre_db.common_filters(True)) calibre_db.create_functions() # calibre_db.session.connection().connection.connection.create_function("lower", 1, db.lcase) author_input = request.args.get('authors') or '' title_input = request.args.get('title') or '' include_tag_inputs = request.args.getlist('include_tag') or '' exclude_tag_inputs = request.args.getlist('exclude_tag') or '' q = q.filter(db.Books.authors.any(func.lower(db.Authors.name).ilike("%" + author_input + "%")), func.lower(db.Books.title).ilike("%" + title_input + "%")) if len(include_tag_inputs) > 0: for tag in include_tag_inputs: q = q.filter(db.Books.tags.any(db.Tags.id == tag)) if len(exclude_tag_inputs) > 0: for tag in exclude_tag_inputs: q = q.filter(not_(db.Books.tags.any(db.Tags.id == tag))) for book in q: for tag in book.tags: if tag.id not in tag_dict['tags']: tag_dict['tags'].append(tag.id) json_dumps = json.dumps(tag_dict) return json_dumps def generate_char_list(entries): # data_colum, db_link): char_list = list() for entry in entries: upper_char = entry[0].name[0].upper() if upper_char not in char_list: char_list.append(upper_char) return char_list def query_char_list(data_colum, db_link): results = (calibre_db.session.query(func.upper(func.substr(data_colum, 1, 1)).label('char')) .join(db_link).join(db.Books).filter(calibre_db.common_filters()) .group_by(func.upper(func.substr(data_colum, 1, 1))).all()) return results def get_sort_function(sort_param, data): order = [db.Books.timestamp.desc()] if sort_param == 'stored': sort_param = current_user.get_view_property(data, 'stored') else: current_user.set_view_property(data, 'stored', sort_param) if sort_param == 'pubnew': order = [db.Books.pubdate.desc()] if sort_param == 'pubold': order = [db.Books.pubdate] if sort_param == 'abc': order = [db.Books.sort] if sort_param == 'zyx': order = [db.Books.sort.desc()] if sort_param == 'new': order = [db.Books.timestamp.desc()] if sort_param == 'old': order = [db.Books.timestamp] if sort_param == 'authaz': order = [db.Books.author_sort.asc(), db.Series.name, db.Books.series_index] if sort_param == 'authza': order = [db.Books.author_sort.desc(), db.Series.name.desc(), db.Books.series_index.desc()] if sort_param == 'seriesasc': order = [db.Books.series_index.asc()] if sort_param == 'seriesdesc': order = [db.Books.series_index.desc()] if sort_param == 'hotdesc': order = [func.count(ub.Downloads.book_id).desc()] if sort_param == 'hotasc': order = [func.count(ub.Downloads.book_id).asc()] if sort_param is None: sort_param = "new" return order, sort_param def render_books_list(data, sort_param, book_id, page): order = get_sort_function(sort_param, data) if data == "rated": return render_rated_books(page, book_id, order=order) elif data == "discover": return render_discover_books(book_id) elif data == "unread": return render_read_books(page, False, order=order) elif data == "read": return render_read_books(page, True, order=order) elif data == "hot": return render_hot_books(page, order) elif data == "download": return render_downloaded_books(page, order, book_id) elif data == "author": return render_author_books(page, book_id, order) elif data == "publisher": return render_publisher_books(page, book_id, order) elif data == "series": return render_series_books(page, book_id, order) elif data == "ratings": return render_ratings_books(page, book_id, order) elif data == "formats": return render_formats_books(page, book_id, order) elif data == "category": return render_category_books(page, book_id, order) elif data == "language": return render_language_books(page, book_id, order) elif data == "archived": return render_archived_books(page, order) elif data == "search": term = request.args.get('query', None) offset = int(int(config.config_books_per_page) * (page - 1)) return render_search_results(term, offset, order, config.config_books_per_page) elif data == "advsearch": term = json.loads(flask_session.get('query', '{}')) offset = int(int(config.config_books_per_page) * (page - 1)) return render_adv_search_results(term, offset, order, config.config_books_per_page) else: website = data or "newest" entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, True, order[0], True, config.config_read_column, db.books_series_link, db.Books.id == db.books_series_link.c.book, db.Series) return render_title_template('index.html', random=random, entries=entries, pagination=pagination, title=_("Books"), page=website, order=order[1]) def render_rated_books(page, book_id, order): if current_user.check_visibility(constants.SIDEBAR_BEST_RATED): entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, db.Books.ratings.any(db.Ratings.rating > 9), order[0], True, config.config_read_column, db.books_series_link, db.Books.id == db.books_series_link.c.book, db.Series) return render_title_template('index.html', random=random, entries=entries, pagination=pagination, id=book_id, title=_("Top Rated Books"), page="rated", order=order[1]) else: abort(404) def render_discover_books(book_id): if current_user.check_visibility(constants.SIDEBAR_RANDOM): entries, __, ___ = calibre_db.fill_indexpage(1, 0, db.Books, True, [func.randomblob(2)], join_archive_read=True, config_read_column=config.config_read_column) pagination = Pagination(1, config.config_books_per_page, config.config_books_per_page) return render_title_template('index.html', random=false(), entries=entries, pagination=pagination, id=book_id, title=_("Discover (Random Books)"), page="discover") else: abort(404) def render_hot_books(page, order): if current_user.check_visibility(constants.SIDEBAR_HOT): if order[1] not in ['hotasc', 'hotdesc']: # Unary expression comparison only working (for this expression) in sqlalchemy 1.4+ # if not (order[0][0].compare(func.count(ub.Downloads.book_id).desc()) or # order[0][0].compare(func.count(ub.Downloads.book_id).asc())): order = [func.count(ub.Downloads.book_id).desc()], 'hotdesc' if current_user.show_detail_random(): random_query = calibre_db.generate_linked_query(config.config_read_column, db.Books) random = (random_query.filter(calibre_db.common_filters()) .order_by(func.random()) .limit(config.config_random_books).all()) else: random = false() off = int(int(config.config_books_per_page) * (page - 1)) all_books = ub.session.query(ub.Downloads, func.count(ub.Downloads.book_id)) \ .order_by(*order[0]).group_by(ub.Downloads.book_id) hot_books = all_books.offset(off).limit(config.config_books_per_page) entries = list() for book in hot_books: query = calibre_db.generate_linked_query(config.config_read_column, db.Books) download_book = query.filter(calibre_db.common_filters()).filter( book.Downloads.book_id == db.Books.id).first() if download_book: entries.append(download_book) else: ub.delete_download(book.Downloads.book_id) num_books = entries.__len__() pagination = Pagination(page, config.config_books_per_page, num_books) return render_title_template('index.html', random=random, entries=entries, pagination=pagination, title=_("Hot Books (Most Downloaded)"), page="hot", order=order[1]) else: abort(404) def render_downloaded_books(page, order, user_id): if current_user.role_admin(): user_id = int(user_id) else: user_id = current_user.id user = ub.session.query(ub.User).filter(ub.User.id == user_id).first() if current_user.check_visibility(constants.SIDEBAR_DOWNLOAD) and user: entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, ub.Downloads.user_id == user_id, order[0], True, config.config_read_column, db.books_series_link, db.Books.id == db.books_series_link.c.book, db.Series, ub.Downloads, db.Books.id == ub.Downloads.book_id) for book in entries: if not (calibre_db.session.query(db.Books).filter(calibre_db.common_filters()) .filter(db.Books.id == book.Books.id).first()): ub.delete_download(book.Books.id) return render_title_template('index.html', random=random, entries=entries, pagination=pagination, id=user_id, title=_("Downloaded books by %(user)s", user=user.name), page="download", order=order[1]) else: abort(404) def render_author_books(page, author_id, order): entries, __, pagination = calibre_db.fill_indexpage(page, 0, db.Books, db.Books.authors.any(db.Authors.id == author_id), [order[0][0], db.Series.name, db.Books.series_index], True, config.config_read_column, db.books_series_link, db.books_series_link.c.book == db.Books.id, db.Series) if entries is None or not len(entries): flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"), category="error") return redirect(url_for("web.index")) if sqlalchemy_version2: author = calibre_db.session.get(db.Authors, author_id) else: author = calibre_db.session.query(db.Authors).get(author_id) author_name = author.name.replace('|', ',') author_info = None other_books = [] if services.goodreads_support and config.config_use_goodreads: author_info = services.goodreads_support.get_author_info(author_name) book_entries = [entry.Books for entry in entries] other_books = services.goodreads_support.get_other_books(author_info, book_entries) return render_title_template('author.html', entries=entries, pagination=pagination, id=author_id, title=_("Author: %(name)s", name=author_name), author=author_info, other_books=other_books, page="author", order=order[1]) def render_publisher_books(page, book_id, order): if book_id == '-1': entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, db.Publishers.name == None, [db.Series.name, order[0][0], db.Books.series_index], True, config.config_read_column, db.books_publishers_link, db.Books.id == db.books_publishers_link.c.book, db.Publishers, db.books_series_link, db.Books.id == db.books_series_link.c.book, db.Series) publisher = _("None") else: publisher = calibre_db.session.query(db.Publishers).filter(db.Publishers.id == book_id).first() if publisher: entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, db.Books.publishers.any( db.Publishers.id == book_id), [db.Series.name, order[0][0], db.Books.series_index], True, config.config_read_column, db.books_series_link, db.Books.id == db.books_series_link.c.book, db.Series) publisher = publisher.name else: abort(404) return render_title_template('index.html', random=random, entries=entries, pagination=pagination, id=book_id, title=_("Publisher: %(name)s", name=publisher), page="publisher", order=order[1]) def render_series_books(page, book_id, order): if book_id == '-1': entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, db.Series.name == None, [order[0][0]], True, config.config_read_column, db.books_series_link, db.Books.id == db.books_series_link.c.book, db.Series) series_name = _("None") else: series_name = calibre_db.session.query(db.Series).filter(db.Series.id == book_id).first() if series_name: entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, db.Books.series.any(db.Series.id == book_id), [order[0][0]], True, config.config_read_column) series_name = series_name.name else: abort(404) return render_title_template('index.html', random=random, pagination=pagination, entries=entries, id=book_id, title=_("Series: %(serie)s", serie=series_name), page="series", order=order[1]) def render_ratings_books(page, book_id, order): if book_id == '-1': db_filter = coalesce(db.Ratings.rating, 0) < 1 entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, db_filter, [order[0][0]], True, config.config_read_column, db.books_ratings_link, db.Books.id == db.books_ratings_link.c.book, db.Ratings) title = _("Rating: None") else: name = calibre_db.session.query(db.Ratings).filter(db.Ratings.id == book_id).first() if name: entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, db.Books.ratings.any(db.Ratings.id == book_id), [order[0][0]], True, config.config_read_column) title = _("Rating: %(rating)s stars", rating=int(name.rating / 2)) else: abort(404) return render_title_template('index.html', random=random, pagination=pagination, entries=entries, id=book_id, title=title, page="ratings", order=order[1]) def render_formats_books(page, book_id, order): if book_id == '-1': name = _("None") entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, db.Data.format == None, [order[0][0]], True, config.config_read_column, db.Data) else: name = calibre_db.session.query(db.Data).filter(db.Data.format == book_id.upper()).first() if name: name = name.format entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, db.Books.data.any( db.Data.format == book_id.upper()), [order[0][0]], True, config.config_read_column) else: abort(404) return render_title_template('index.html', random=random, pagination=pagination, entries=entries, id=book_id, title=_("File format: %(format)s", format=name), page="formats", order=order[1]) def render_category_books(page, book_id, order): if book_id == '-1': entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, db.Tags.name == None, [order[0][0], db.Series.name, db.Books.series_index], True, config.config_read_column, db.books_tags_link, db.Books.id == db.books_tags_link.c.book, db.Tags, db.books_series_link, db.Books.id == db.books_series_link.c.book, db.Series) tagsname = _("None") else: tagsname = calibre_db.session.query(db.Tags).filter(db.Tags.id == book_id).first() if tagsname: entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, db.Books.tags.any(db.Tags.id == book_id), [order[0][0], db.Series.name, db.Books.series_index], True, config.config_read_column, db.books_series_link, db.Books.id == db.books_series_link.c.book, db.Series) tagsname = tagsname.name else: abort(404) return render_title_template('index.html', random=random, entries=entries, pagination=pagination, id=book_id, title=_("Category: %(name)s", name=tagsname), page="category", order=order[1]) def render_language_books(page, name, order): try: if name.lower() != "none": lang_name = isoLanguages.get_language_name(get_locale(), name) if lang_name == "Unknown": abort(404) else: lang_name = _("None") except KeyError: abort(404) if name == "none": entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, db.Languages.lang_code == None, [order[0][0]], True, config.config_read_column, db.books_languages_link, db.Books.id == db.books_languages_link.c.book, db.Languages) else: entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, db.Books.languages.any(db.Languages.lang_code == name), [order[0][0]], True, config.config_read_column) return render_title_template('index.html', random=random, entries=entries, pagination=pagination, id=name, title=_("Language: %(name)s", name=lang_name), page="language", order=order[1]) def render_read_books(page, are_read, as_xml=False, order=None): sort_param = order[0] if order else [] if not config.config_read_column: if are_read: db_filter = and_(ub.ReadBook.user_id == int(current_user.id), ub.ReadBook.read_status == ub.ReadBook.STATUS_FINISHED) else: db_filter = coalesce(ub.ReadBook.read_status, 0) != ub.ReadBook.STATUS_FINISHED else: try: if are_read: db_filter = db.cc_classes[config.config_read_column].value == True else: db_filter = coalesce(db.cc_classes[config.config_read_column].value, False) != True except (KeyError, AttributeError, IndexError): log.error("Custom Column No.{} does not exist in calibre database".format(config.config_read_column)) if not as_xml: flash(_("Custom Column No.%(column)d does not exist in calibre database", column=config.config_read_column), category="error") return redirect(url_for("web.index")) return [] # ToDo: Handle error Case for opds entries, random, pagination = calibre_db.fill_indexpage(page, 0, db.Books, db_filter, sort_param, True, config.config_read_column, db.books_series_link, db.Books.id == db.books_series_link.c.book, db.Series) if as_xml: return entries, pagination else: if are_read: name = _('Read Books') + ' (' + str(pagination.total_count) + ')' page_name = "read" else: name = _('Unread Books') + ' (' + str(pagination.total_count) + ')' page_name = "unread" return render_title_template('index.html', random=random, entries=entries, pagination=pagination, title=name, page=page_name, order=order[1]) def render_archived_books(page, sort_param): order = sort_param[0] or [] archived_books = (ub.session.query(ub.ArchivedBook) .filter(ub.ArchivedBook.user_id == int(current_user.id)) .filter(ub.ArchivedBook.is_archived == True) .all()) archived_book_ids = [archived_book.book_id for archived_book in archived_books] archived_filter = db.Books.id.in_(archived_book_ids) entries, random, pagination = calibre_db.fill_indexpage_with_archived_books(page, db.Books, 0, archived_filter, order, True, True, config.config_read_column) name = _('Archived Books') + ' (' + str(len(entries)) + ')' page_name = "archived" return render_title_template('index.html', random=random, entries=entries, pagination=pagination, title=name, page=page_name, order=sort_param[1]) # ################################### View Books list ################################################################## @web.route("/", defaults={'page': 1}) @web.route('/page/') @login_required_if_no_ano def index(page): sort_param = (request.args.get('sort') or 'stored').lower() return render_books_list("newest", sort_param, 1, page) @web.route('//', defaults={'page': 1, 'book_id': 1}) @web.route('///', defaults={'page': 1, 'book_id': 1}) @web.route('///', defaults={'page': 1}) @web.route('////') @login_required_if_no_ano def books_list(data, sort_param, book_id, page): return render_books_list(data, sort_param, book_id, page) @web.route("/table") @user_login_required def books_table(): visibility = current_user.view_settings.get('table', {}) cc = calibre_db.get_cc_columns(config, filter_config_custom_read=True) return render_title_template('book_table.html', title=_("Books List"), cc=cc, page="book_table", visiblility=visibility) @web.route("/ajax/listbooks") @user_login_required def list_books(): off = int(request.args.get("offset") or 0) limit = int(request.args.get("limit") or config.config_books_per_page) search_param = request.args.get("search") sort_param = request.args.get("sort", "id") order = request.args.get("order", "").lower() state = None join = tuple() if sort_param == "state": state = json.loads(request.args.get("state", "[]")) elif sort_param == "tags": order = [db.Tags.name.asc()] if order == "asc" else [db.Tags.name.desc()] join = db.books_tags_link, db.Books.id == db.books_tags_link.c.book, db.Tags elif sort_param == "series": order = [db.Series.name.asc()] if order == "asc" else [db.Series.name.desc()] join = db.books_series_link, db.Books.id == db.books_series_link.c.book, db.Series elif sort_param == "publishers": order = [db.Publishers.name.asc()] if order == "asc" else [db.Publishers.name.desc()] join = db.books_publishers_link, db.Books.id == db.books_publishers_link.c.book, db.Publishers elif sort_param == "authors": order = [db.Authors.name.asc(), db.Series.name, db.Books.series_index] if order == "asc" \ else [db.Authors.name.desc(), db.Series.name.desc(), db.Books.series_index.desc()] join = db.books_authors_link, db.Books.id == db.books_authors_link.c.book, db.Authors, db.books_series_link, \ db.Books.id == db.books_series_link.c.book, db.Series elif sort_param == "languages": order = [db.Languages.lang_code.asc()] if order == "asc" else [db.Languages.lang_code.desc()] join = db.books_languages_link, db.Books.id == db.books_languages_link.c.book, db.Languages elif order and sort_param in ["sort", "title", "authors_sort", "series_index"]: order = [text(sort_param + " " + order)] elif not state: order = [db.Books.timestamp.desc()] total_count = filtered_count = calibre_db.session.query(db.Books).filter( calibre_db.common_filters(allow_show_archived=True)).count() if state is not None: if search_param: books = calibre_db.search_query(search_param, config).all() filtered_count = len(books) else: query = calibre_db.generate_linked_query(config.config_read_column, db.Books) books = query.filter(calibre_db.common_filters(allow_show_archived=True)).all() entries = calibre_db.get_checkbox_sorted(books, state, off, limit, order, True) elif search_param: entries, filtered_count, __ = calibre_db.get_search_results(search_param, config, off, [order, ''], limit, *join) else: entries, __, __ = calibre_db.fill_indexpage_with_archived_books((int(off) / (int(limit)) + 1), db.Books, limit, True, order, True, True, config.config_read_column, *join) result = list() for entry in entries: val = entry[0] val.is_archived = entry[1] is True val.read_status = entry[2] == ub.ReadBook.STATUS_FINISHED for lang_index in range(0, len(val.languages)): val.languages[lang_index].language_name = isoLanguages.get_language_name(get_locale(), val.languages[ lang_index].lang_code) result.append(val) table_entries = {'totalNotFiltered': total_count, 'total': filtered_count, "rows": result} js_list = json.dumps(table_entries, cls=db.AlchemyEncoder) response = make_response(js_list) response.headers["Content-Type"] = "application/json; charset=utf-8" return response @web.route("/ajax/table_settings", methods=['POST']) @user_login_required def update_table_settings(): current_user.view_settings['table'] = json.loads(request.data) try: try: flag_modified(current_user, "view_settings") except AttributeError: pass ub.session.commit() except (InvalidRequestError, OperationalError): log.error("Invalid request received: %r ", request, ) return "Invalid request", 400 return "" @web.route("/author") @login_required_if_no_ano def author_list(): if current_user.check_visibility(constants.SIDEBAR_AUTHOR): if current_user.get_view_property('author', 'dir') == 'desc': order = db.Authors.sort.desc() order_no = 0 else: order = db.Authors.sort.asc() order_no = 1 entries = calibre_db.session.query(db.Authors, func.count('books_authors_link.book').label('count')) \ .join(db.books_authors_link).join(db.Books).filter(calibre_db.common_filters()) \ .group_by(text('books_authors_link.author')).order_by(order).all() char_list = query_char_list(db.Authors.sort, db.books_authors_link) # If not creating a copy, readonly databases can not display authornames with "|" in it as changing the name # starts a change session author_copy = copy.deepcopy(entries) for entry in author_copy: entry.Authors.name = entry.Authors.name.replace('|', ',') return render_title_template('list.html', entries=author_copy, folder='web.books_list', charlist=char_list, title="Authors", page="authorlist", data='author', order=order_no) else: abort(404) @web.route("/downloadlist") @login_required_if_no_ano def download_list(): if current_user.get_view_property('download', 'dir') == 'desc': order = ub.User.name.desc() order_no = 0 else: order = ub.User.name.asc() order_no = 1 if current_user.check_visibility(constants.SIDEBAR_DOWNLOAD) and current_user.role_admin(): entries = ub.session.query(ub.User, func.count(ub.Downloads.book_id).label('count')) \ .join(ub.Downloads).group_by(ub.Downloads.user_id).order_by(order).all() char_list = ub.session.query(func.upper(func.substr(ub.User.name, 1, 1)).label('char')) \ .filter(ub.User.role.op('&')(constants.ROLE_ANONYMOUS) != constants.ROLE_ANONYMOUS) \ .group_by(func.upper(func.substr(ub.User.name, 1, 1))).all() return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=char_list, title=_("Downloads"), page="downloadlist", data="download", order=order_no) else: abort(404) @web.route("/publisher") @login_required_if_no_ano def publisher_list(): if current_user.get_view_property('publisher', 'dir') == 'desc': order = db.Publishers.name.desc() order_no = 0 else: order = db.Publishers.name.asc() order_no = 1 if current_user.check_visibility(constants.SIDEBAR_PUBLISHER): entries = calibre_db.session.query(db.Publishers, func.count('books_publishers_link.book').label('count')) \ .join(db.books_publishers_link).join(db.Books).filter(calibre_db.common_filters()) \ .group_by(text('books_publishers_link.publisher')).order_by(order).all() no_publisher_count = (calibre_db.session.query(db.Books) .outerjoin(db.books_publishers_link).outerjoin(db.Publishers) .filter(db.Publishers.name == None) .filter(calibre_db.common_filters()) .count()) if no_publisher_count: entries.append([db.Category(_("None"), "-1"), no_publisher_count]) entries = sorted(entries, key=lambda x: x[0].name.lower(), reverse=not order_no) char_list = generate_char_list(entries) return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=char_list, title=_("Publishers"), page="publisherlist", data="publisher", order=order_no) else: abort(404) @web.route("/series") @login_required_if_no_ano def series_list(): if current_user.check_visibility(constants.SIDEBAR_SERIES): if current_user.get_view_property('series', 'dir') == 'desc': order = db.Series.sort.desc() order_no = 0 else: order = db.Series.sort.asc() order_no = 1 char_list = query_char_list(db.Series.sort, db.books_series_link) if current_user.get_view_property('series', 'series_view') == 'list': entries = calibre_db.session.query(db.Series, func.count('books_series_link.book').label('count')) \ .join(db.books_series_link).join(db.Books).filter(calibre_db.common_filters()) \ .group_by(text('books_series_link.series')).order_by(order).all() no_series_count = (calibre_db.session.query(db.Books) .outerjoin(db.books_series_link).outerjoin(db.Series) .filter(db.Series.name == None) .filter(calibre_db.common_filters()) .count()) if no_series_count: entries.append([db.Category(_("None"), "-1"), no_series_count]) entries = sorted(entries, key=lambda x: x[0].name.lower(), reverse=not order_no) return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=char_list, title=_("Series"), page="serieslist", data="series", order=order_no) else: entries = (calibre_db.session.query(db.Books, func.count('books_series_link').label('count'), func.max(db.Books.series_index), db.Books.id) .join(db.books_series_link).join(db.Series).filter(calibre_db.common_filters()) .group_by(text('books_series_link.series')) .having(or_(func.max(db.Books.series_index), db.Books.series_index=="")) .order_by(order) .all()) return render_title_template('grid.html', entries=entries, folder='web.books_list', charlist=char_list, title=_("Series"), page="serieslist", data="series", bodyClass="grid-view", order=order_no) else: abort(404) @web.route("/ratings") @login_required_if_no_ano def ratings_list(): if current_user.check_visibility(constants.SIDEBAR_RATING): if current_user.get_view_property('ratings', 'dir') == 'desc': order = db.Ratings.rating.desc() order_no = 0 else: order = db.Ratings.rating.asc() order_no = 1 entries = calibre_db.session.query(db.Ratings, func.count('books_ratings_link.book').label('count'), (db.Ratings.rating / 2).label('name')) \ .join(db.books_ratings_link).join(db.Books).filter(calibre_db.common_filters()) \ .filter(db.Ratings.rating > 0) \ .group_by(text('books_ratings_link.rating')).order_by(order).all() no_rating_count = (calibre_db.session.query(db.Books) .outerjoin(db.books_ratings_link).outerjoin(db.Ratings) .filter(or_(db.Ratings.rating == None, db.Ratings.rating == 0)) .filter(calibre_db.common_filters()) .count()) if no_rating_count: entries.append([db.Category(_("None"), "-1", -1), no_rating_count]) entries = sorted(entries, key=lambda x: x[0].rating, reverse=not order_no) return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=list(), title=_("Ratings list"), page="ratingslist", data="ratings", order=order_no) else: abort(404) @web.route("/formats") @login_required_if_no_ano def formats_list(): if current_user.check_visibility(constants.SIDEBAR_FORMAT): if current_user.get_view_property('formats', 'dir') == 'desc': order = db.Data.format.desc() order_no = 0 else: order = db.Data.format.asc() order_no = 1 entries = calibre_db.session.query(db.Data, func.count('data.book').label('count'), db.Data.format.label('format')) \ .join(db.Books).filter(calibre_db.common_filters()) \ .group_by(db.Data.format).order_by(order).all() no_format_count = (calibre_db.session.query(db.Books).outerjoin(db.Data) .filter(db.Data.format == None) .filter(calibre_db.common_filters()) .count()) if no_format_count: entries.append([db.Category(_("None"), "-1"), no_format_count]) return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=list(), title=_("File formats list"), page="formatslist", data="formats", order=order_no) else: abort(404) @web.route("/language") @login_required_if_no_ano def language_overview(): if current_user.check_visibility(constants.SIDEBAR_LANGUAGE) and current_user.filter_language() == "all": order_no = 0 if current_user.get_view_property('language', 'dir') == 'desc' else 1 languages = calibre_db.speaking_language(reverse_order=not order_no, with_count=True) char_list = generate_char_list(languages) return render_title_template('list.html', entries=languages, folder='web.books_list', charlist=char_list, title=_("Languages"), page="langlist", data="language", order=order_no) else: abort(404) @web.route("/category") @login_required_if_no_ano def category_list(): if current_user.check_visibility(constants.SIDEBAR_CATEGORY): if current_user.get_view_property('category', 'dir') == 'desc': order = db.Tags.name.desc() order_no = 0 else: order = db.Tags.name.asc() order_no = 1 entries = calibre_db.session.query(db.Tags, func.count('books_tags_link.book').label('count')) \ .join(db.books_tags_link).join(db.Books).order_by(order).filter(calibre_db.common_filters()) \ .group_by(db.Tags.id).all() no_tag_count = (calibre_db.session.query(db.Books) .outerjoin(db.books_tags_link).outerjoin(db.Tags) .filter(db.Tags.name == None) .filter(calibre_db.common_filters()) .count()) if no_tag_count: entries.append([db.Category(_("None"), "-1"), no_tag_count]) entries = sorted(entries, key=lambda x: x[0].name.lower(), reverse=not order_no) char_list = generate_char_list(entries) return render_title_template('list.html', entries=entries, folder='web.books_list', charlist=char_list, title=_("Categories"), page="catlist", data="category", order=order_no) else: abort(404) # ################################### Download/Send ################################################################## @web.route("/cover/") @web.route("/cover//") @login_required_if_no_ano def get_cover(book_id, resolution=None): resolutions = { 'og': constants.COVER_THUMBNAIL_ORIGINAL, 'sm': constants.COVER_THUMBNAIL_SMALL, 'md': constants.COVER_THUMBNAIL_MEDIUM, 'lg': constants.COVER_THUMBNAIL_LARGE, } cover_resolution = resolutions.get(resolution, None) return get_book_cover(book_id, cover_resolution) @web.route("/series_cover/") @web.route("/series_cover//") @login_required_if_no_ano def get_series_cover(series_id, resolution=None): resolutions = { 'og': constants.COVER_THUMBNAIL_ORIGINAL, 'sm': constants.COVER_THUMBNAIL_SMALL, 'md': constants.COVER_THUMBNAIL_MEDIUM, 'lg': constants.COVER_THUMBNAIL_LARGE, } cover_resolution = resolutions.get(resolution, None) return get_series_cover_thumbnail(series_id, cover_resolution) @web.route("/robots.txt") def get_robots(): try: return send_from_directory(constants.STATIC_DIR, "robots.txt") except PermissionError: log.error("No permission to access robots.txt file.") abort(403) @web.route("/show//", defaults={'anyname': 'None'}) @web.route("/show///") @login_required_if_no_ano @viewer_required def serve_book(book_id, book_format, anyname): book_format = book_format.split(".")[0] book = calibre_db.get_book(book_id) data = calibre_db.get_book_format(book_id, book_format.upper()) if not data: return "File not in Database" range_header = request.headers.get('Range', None) if not range_header: log.info('Serving book: \'%s\' to %s - %s', data.name, current_user.name, request.headers.get('X-Forwarded-For', request.remote_addr)) if config.config_use_google_drive: try: headers = Headers() headers["Content-Type"] = mimetypes.types_map.get('.' + book_format, "application/octet-stream") if not range_header: headers['Accept-Ranges'] = 'bytes' df = getFileFromEbooksFolder(book.path, data.name + "." + book_format) return do_gdrive_download(df, headers, (book_format.upper() == 'TXT')) except AttributeError as ex: log.error_or_exception(ex) return "File Not Found" else: if book_format.upper() == 'TXT': try: rawdata = open(os.path.join(config.get_book_path(), book.path, data.name + "." + book_format), "rb").read() result = chardet.detect(rawdata) try: text_data = rawdata.decode(result['encoding']).encode('utf-8') except UnicodeDecodeError as e: log.error("Encoding error in text file {}: {}".format(book.id, e)) if "surrogate" in e.reason: text_data = rawdata.decode(result['encoding'], 'surrogatepass').encode('utf-8', 'surrogatepass') else: text_data = rawdata.decode(result['encoding'], 'ignore').encode('utf-8', 'ignore') return make_response(text_data) except FileNotFoundError: log.error("File Not Found") return "File Not Found" # enable byte range read of pdf response = make_response( send_from_directory(os.path.join(config.get_book_path(), book.path), data.name + "." + book_format)) if not range_header: response.headers['Accept-Ranges'] = 'bytes' return response @web.route("/download//", defaults={'anyname': 'None'}) @web.route("/download///") @login_required_if_no_ano @download_required def download_link(book_id, book_format, anyname): client = "kobo" if "Kobo" in request.headers.get('User-Agent') else "" return get_download_link(book_id, book_format, client) @web.route('/send///', methods=["POST"]) @login_required_if_no_ano @download_required def send_to_ereader(book_id, book_format, convert): if not config.get_mail_server_configured(): return make_response(jsonify(type="danger", message=_("Please configure the SMTP mail settings first..."))) elif current_user.kindle_mail: result = send_mail(book_id, book_format, convert, current_user.kindle_mail, config.get_book_path(), current_user.name) if result is None: ub.update_download(book_id, int(current_user.id)) response = [{'type': "success", 'message': _("Success! Book queued for sending to %(eReadermail)s", eReadermail=current_user.kindle_mail)}] else: response = [{'type': "danger", 'message': _("Oops! There was an error sending book: %(res)s", res=result)}] else: response = [{'type': "danger", 'message': _("Oops! Please update your profile with a valid eReader Email.")}] return make_response(jsonify(response)) # ################################### Login Logout ################################################################## @web.route('/register', methods=['POST']) @limiter.limit("40/day", key_func=get_remote_address) @limiter.limit("3/minute", key_func=get_remote_address) def register_post(): if not config.config_public_reg: abort(404) to_save = request.form.to_dict() try: limiter.check() except RateLimitExceeded: flash(_(u"Please wait one minute to register next user"), category="error") return render_title_template('register.html', config=config, title=_("Register"), page="register") except (ConnectionError, Exception) as e: log.error("Connection error to limiter backend: %s", e) flash(_("Connection error to limiter backend, please contact your administrator"), category="error") return render_title_template('register.html', config=config, title=_("Register"), page="register") if current_user is not None and current_user.is_authenticated: return redirect(url_for('web.index')) if not config.get_mail_server_configured(): flash(_("Oops! Email server is not configured, please contact your administrator."), category="error") return render_title_template('register.html', title=_("Register"), page="register") nickname = strip_whitespaces(to_save.get("email", "")) if config.config_register_email else to_save.get('name') if not nickname or not to_save.get("email"): flash(_("Oops! Please complete all fields."), category="error") return render_title_template('register.html', title=_("Register"), page="register") try: nickname = check_username(nickname) email = check_email(to_save.get("email", "")) except Exception as ex: flash(str(ex), category="error") return render_title_template('register.html', title=_("Register"), page="register") content = ub.User() if check_valid_domain(email): content.name = nickname content.email = email password = generate_random_password(config.config_password_min_length) content.password = generate_password_hash(password) content.role = config.config_default_role content.locale = config.config_default_locale content.sidebar_view = config.config_default_show try: ub.session.add(content) ub.session.commit() if feature_support['oauth']: register_user_with_oauth(content) send_registration_mail(strip_whitespaces(to_save.get("email", "")), nickname, password) except Exception: ub.session.rollback() flash(_("Oops! An unknown error occurred. Please try again later."), category="error") return render_title_template('register.html', title=_("Register"), page="register") else: flash(_("Oops! Your Email is not allowed."), category="error") log.warning('Registering failed for user "{}" Email: {}'.format(nickname, to_save.get("email",""))) return render_title_template('register.html', title=_("Register"), page="register") flash(_("Success! Confirmation Email has been sent."), category="success") return redirect(url_for('web.login')) @web.route('/register', methods=['GET']) def register(): if not config.config_public_reg: abort(404) if current_user is not None and current_user.is_authenticated: return redirect(url_for('web.index')) if not config.get_mail_server_configured(): flash(_("Oops! Email server is not configured, please contact your administrator."), category="error") return render_title_template('register.html', title=_("Register"), page="register") if feature_support['oauth']: register_user_with_oauth() return render_title_template('register.html', config=config, title=_("Register"), page="register") def handle_login_user(user, remember, message, category): login_user(user, remember=remember) flash(message, category=category) [limiter.limiter.storage.clear(k.key) for k in limiter.current_limits] return redirect(get_redirect_location(request.form.get('next', None), "web.index")) def render_login(username="", password=""): next_url = request.args.get('next', default=url_for("web.index"), type=str) if url_for("web.logout") == next_url: next_url = url_for("web.index") return render_title_template('login.html', title=_("Login"), next_url=next_url, config=config, username=username, password=password, oauth_check=oauth_check, mail=config.get_mail_server_configured(), page="login") @web.route('/login', methods=['GET']) def login(): if current_user is not None and current_user.is_authenticated: return redirect(url_for('web.index')) if config.config_login_type == constants.LOGIN_LDAP and not services.ldap: log.error(u"Cannot activate LDAP authentication") flash(_(u"Cannot activate LDAP authentication"), category="error") return render_login() @web.route('/login', methods=['POST']) @limiter.limit("40/day", key_func=lambda: strip_whitespaces(request.form.get('username', "")).lower()) @limiter.limit("3/minute", key_func=lambda: strip_whitespaces(request.form.get('username', "")).lower()) def login_post(): form = request.form.to_dict() username = strip_whitespaces(form.get('username', "")).lower().replace("\n","").replace("\r","") try: limiter.check() except RateLimitExceeded: flash(_("Please wait one minute before next login"), category="error") return render_login(username, form.get("password", "")) except (ConnectionError, Exception) as e: log.error("Connection error to limiter backend: %s", e) flash(_("Connection error to limiter backend, please contact your administrator"), category="error") return render_login(username, form.get("password", "")) if current_user is not None and current_user.is_authenticated: return redirect(url_for('web.index')) if config.config_login_type == constants.LOGIN_LDAP and not services.ldap: log.error(u"Cannot activate LDAP authentication") flash(_(u"Cannot activate LDAP authentication"), category="error") user = ub.session.query(ub.User).filter(func.lower(ub.User.name) == username).first() remember_me = bool(form.get('remember_me')) if config.config_login_type == constants.LOGIN_LDAP and services.ldap and user and form['password'] != "": login_result, error = services.ldap.bind_user(username, form['password']) if login_result: log.debug(u"You are now logged in as: '{}'".format(user.name)) return handle_login_user(user, remember_me, _(u"you are now logged in as: '%(nickname)s'", nickname=user.name), "success") elif login_result is None and user and check_password_hash(str(user.password), form['password']) \ and user.name != "Guest": log.info("Local Fallback Login as: '{}'".format(user.name)) return handle_login_user(user, remember_me, _(u"Fallback Login as: '%(nickname)s', " u"LDAP Server not reachable, or user not known", nickname=user.name), "warning") elif login_result is None: log.info(error) flash(_(u"Could not login: %(message)s", message=error), category="error") else: ip_address = request.headers.get('X-Forwarded-For', request.remote_addr) log.warning('LDAP Login failed for user "%s" IP-address: %s', username, ip_address) flash(_(u"Wrong Username or Password"), category="error") else: ip_address = request.headers.get('X-Forwarded-For', request.remote_addr) if form.get('forgot', "") == 'forgot': if user is not None and user.name != "Guest": ret, __ = reset_password(user.id) if ret == 1: flash(_(u"New Password was sent to your email address"), category="info") log.info('Password reset for user "%s" IP-address: %s', username, ip_address) else: log.error(u"An unknown error occurred. Please try again later") flash(_(u"An unknown error occurred. Please try again later."), category="error") else: flash(_(u"Please enter valid username to reset password"), category="error") log.warning('Username missing for password reset IP-address: %s', ip_address) else: if user and check_password_hash(str(user.password), form['password']) and user.name != "Guest": config.config_is_initial = False log.debug(u"You are now logged in as: '{}'".format(user.name)) return handle_login_user(user, remember_me, _(u"You are now logged in as: '%(nickname)s'", nickname=user.name), "success") else: log.warning('Login failed for user "{}" IP-address: {}'.format(username, ip_address)) flash(_(u"Wrong Username or Password"), category="error") return render_login(username, form.get("password", "")) @web.route('/logout') @user_login_required def logout(): if current_user is not None and current_user.is_authenticated: ub.delete_user_session(current_user.id, flask_session.get('_id', "")) logout_user() if feature_support['oauth'] and (config.config_login_type == 2 or config.config_login_type == 3): logout_oauth_user() log.debug("User logged out") if config.config_anonbrowse: location = get_redirect_location(request.args.get('next', None), "web.login") else: location = None if location: return redirect(location) else: return redirect(url_for('web.login')) # ################################### Users own configuration ######################################################### def change_profile(kobo_support, local_oauth_check, oauth_status, translations, languages): to_save = request.form.to_dict() current_user.random_books = 0 try: if current_user.role_passwd() or current_user.role_admin(): if to_save.get("password", "") != "": current_user.password = generate_password_hash(valid_password(to_save.get("password"))) if to_save.get("kindle_mail", current_user.kindle_mail) != current_user.kindle_mail: current_user.kindle_mail = valid_email(to_save.get("kindle_mail")) new_email = valid_email(to_save.get("email", current_user.email)) if not new_email: raise Exception(_("Email can't be empty and has to be a valid Email")) if new_email != current_user.email: current_user.email = check_email(new_email) if current_user.role_admin(): if to_save.get("name", current_user.name) != current_user.name: # Query username, if not existing, change current_user.name = check_username(to_save.get("name")) current_user.random_books = 1 if to_save.get("show_random") == "on" else 0 current_user.default_language = to_save.get("default_language", "all") current_user.locale = to_save.get("locale", "en") old_state = current_user.kobo_only_shelves_sync # 1 -> 0: nothing has to be done # 0 -> 1: all synced books have to be added to archived books, + currently synced shelfs which # don't have to be synced have to be removed (added to Shelf archive) current_user.kobo_only_shelves_sync = int(to_save.get("kobo_only_shelves_sync") == "on") or 0 if old_state == 0 and current_user.kobo_only_shelves_sync == 1: kobo_sync_status.update_on_sync_shelfs(current_user.id) except Exception as ex: flash(str(ex), category="error") return render_title_template("user_edit.html", content=current_user, config=config, translations=translations, profile=1, languages=languages, title=_("%(name)s's Profile", name=current_user.name), page="me", kobo_support=kobo_support, registered_oauth=local_oauth_check, oauth_status=oauth_status) val = 0 for key, __ in to_save.items(): if key.startswith('show'): val += int(key[5:]) current_user.sidebar_view = val if to_save.get("Show_detail_random"): current_user.sidebar_view += constants.DETAIL_RANDOM try: ub.session.commit() flash(_("Success! Profile Updated"), category="success") log.debug("Profile updated") except IntegrityError: ub.session.rollback() flash(_("Oops! An account already exists for this Email."), category="error") log.debug("Found an existing account for this Email") except OperationalError as e: ub.session.rollback() log.error("Database error: %s", e) flash(_("Oops! Database Error: %(error)s.", error=e), category="error") @web.route("/me", methods=["GET", "POST"]) @user_login_required def profile(): languages = calibre_db.speaking_language() translations = get_available_locale() kobo_support = feature_support['kobo'] and config.config_kobo_sync if feature_support['oauth'] and config.config_login_type == 2: oauth_status = get_oauth_status() local_oauth_check = oauth_check else: oauth_status = None local_oauth_check = {} if request.method == "POST": change_profile(kobo_support, local_oauth_check, oauth_status, translations, languages) return render_title_template("user_edit.html", translations=translations, profile=1, languages=languages, content=current_user, config=config, kobo_support=kobo_support, title=_("%(name)s's Profile", name=current_user.name), page="me", registered_oauth=local_oauth_check, oauth_status=oauth_status) # ###################################Show single book ################################################################## @web.route("/read//") @login_required_if_no_ano @viewer_required def read_book(book_id, book_format): book = calibre_db.get_filtered_book(book_id) if not book: flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"), category="error") log.debug("Selected book is unavailable. File does not exist or is not accessible") return redirect(url_for("web.index")) book.ordered_authors = calibre_db.order_authors([book], False) # check if book has a bookmark bookmark = None if current_user.is_authenticated: bookmark = ub.session.query(ub.Bookmark).filter(and_(ub.Bookmark.user_id == int(current_user.id), ub.Bookmark.book_id == book_id, ub.Bookmark.format == book_format.upper())).first() if book_format.lower() == "epub" or book_format.lower() == "kepub": log.debug("Start [k]epub reader for %d", book_id) return render_title_template('read.html', bookid=book_id, title=book.title, bookmark=bookmark, book_format=book_format) elif book_format.lower() == "pdf": log.debug("Start pdf reader for %d", book_id) return render_title_template('readpdf.html', pdffile=book_id, title=book.title) elif book_format.lower() == "txt": log.debug("Start txt reader for %d", book_id) return render_title_template('readtxt.html', txtfile=book_id, title=book.title) elif book_format.lower() in ["djvu", "djv"]: log.debug("Start djvu reader for %d", book_id) return render_title_template('readdjvu.html', djvufile=book_id, title=book.title, extension=book_format.lower()) else: for fileExt in constants.EXTENSIONS_AUDIO: if book_format.lower() == fileExt: entries = calibre_db.get_filtered_book(book_id) log.debug("Start mp3 listening for %d", book_id) return render_title_template('listenmp3.html', mp3file=book_id, audioformat=book_format.lower(), entry=entries, bookmark=bookmark) for fileExt in ["cbr", "cbt", "cbz"]: if book_format.lower() == fileExt: all_name = str(book_id) title = book.title if len(book.series): title = title + " - " + book.series[0].name if book.series_index: title = title + " #" + '{0:.2f}'.format(book.series_index).rstrip('0').rstrip('.') log.debug("Start comic reader for %d", book_id) return render_title_template('readcbr.html', comicfile=all_name, title=title, extension=fileExt, bookmark=bookmark) log.debug("Selected book is unavailable. File does not exist or is not accessible") flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"), category="error") return redirect(url_for("web.index")) @web.route("/book/") @login_required_if_no_ano def show_book(book_id): entries = calibre_db.get_book_read_archived(book_id, config.config_read_column, allow_show_archived=True) if entries: read_book = entries[1] archived_book = entries[2] entry = entries[0] entry.read_status = read_book == ub.ReadBook.STATUS_FINISHED entry.is_archived = archived_book for lang_index in range(0, len(entry.languages)): entry.languages[lang_index].language_name = isoLanguages.get_language_name(get_locale(), entry.languages[ lang_index].lang_code) cc = calibre_db.get_cc_columns(config, filter_config_custom_read=True) book_in_shelves = [] shelves = ub.session.query(ub.BookShelf).filter(ub.BookShelf.book_id == book_id).all() for sh in shelves: book_in_shelves.append(sh.shelf) entry.tags = sort(entry.tags, key=lambda tag: tag.name) entry.ordered_authors = calibre_db.order_authors([entry]) entry.email_share_list = check_send_to_ereader(entry) entry.reader_list = check_read_formats(entry) entry.reader_list_sizes = dict() for data in entry.data: if data.format.lower() in entry.reader_list: entry.reader_list_sizes[data.format.lower()] = data.uncompressed_size entry.audio_entries = [] for media_format in entry.data: if media_format.format.lower() in constants.EXTENSIONS_AUDIO: entry.audio_entries.append(media_format.format.lower()) return render_title_template('detail.html', entry=entry, cc=cc, is_xhr=request.headers.get('X-Requested-With') == 'XMLHttpRequest', title=entry.title, books_shelfs=book_in_shelves, page="book") else: log.debug("Selected book is unavailable. File does not exist or is not accessible") flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"), category="error") return redirect(url_for("web.index"))