1
0
mirror of https://github.com/janeczku/calibre-web synced 2024-11-09 11:30:00 +00:00

Refactored opds login (proxy header login missing)

This commit is contained in:
Ozzie Isaacs 2024-07-13 12:17:48 +02:00
parent 87c0b15f5f
commit 2d470e0ce1
3 changed files with 87 additions and 63 deletions

View File

@ -25,24 +25,73 @@ import json
from urllib.parse import unquote_plus
from flask import Blueprint, request, render_template, make_response, abort, Response, g
from flask_login import current_user
from functools import wraps
# from flask_login import current_user
from flask_babel import get_locale
from flask_babel import gettext as _
from flask_httpauth import HTTPBasicAuth
from werkzeug.datastructures import Authorization
from werkzeug.security import generate_password_hash, check_password_hash
from sqlalchemy.sql.expression import func, text, or_, and_, true
from sqlalchemy.exc import InvalidRequestError, OperationalError
from . import logger, config, db, calibre_db, ub, isoLanguages, constants
from .usermanagement import requires_basic_auth_if_no_ano
# from .usermanagement import requires_basic_auth_if_no_ano
from .helper import get_download_link, get_book_cover
from .pagination import Pagination
from .web import render_read_books
from . import limiter, services
opds = Blueprint('opds', __name__)
auth = HTTPBasicAuth()
log = logger.create()
@auth.verify_password
def verify_password(username, password):
user = ub.session.query(ub.User).filter(func.lower(ub.User.name) == username.lower()).first()
if config.config_anonbrowse == 1 and user.name.lower() == "guest":
return user
if bool(user and check_password_hash(str(user.password), password)) and user.name != "Guest":
[limiter.limiter.storage.clear(k.key) for k in limiter.current_limits]
return user
else:
ip_address = request.headers.get('X-Forwarded-For', request.remote_addr)
log.warning('OPDS Login failed for user "%s" IP-address: %s', username, ip_address)
return None
def requires_basic_auth_if_no_ano(f):
@wraps(f)
def decorated(*args, **kwargs):
authorisation = auth.get_auth()
if config.config_anonbrowse == 1 and not authorisation:
authorisation = Authorization(
b"Basic", {'username': "Guest", 'password': ""})
status = None
user = auth.authenticate(authorisation, "")
if config.config_login_type == constants.LOGIN_LDAP and services.ldap:
login_result, error = services.ldap.bind_user(authorisation.username, authorisation.password)
if login_result:
[limiter.limiter.storage.clear(k.key) for k in limiter.current_limits]
elif login_result is not None:
log.error(error)
user = None
if user in (False, None):
status = 401
if status:
try:
return auth.auth_error_callback(status)
except TypeError:
return auth.auth_error_callback()
g.flask_httpauth_user = user if user is not True \
else auth.username if auth else None
return f(*args, **kwargs)
return decorated
@opds.route("/opds/")
@opds.route("/opds")
@requires_basic_auth_if_no_ano
@ -94,7 +143,7 @@ def feed_letter_books(book_id):
@opds.route("/opds/new")
@requires_basic_auth_if_no_ano
def feed_new():
if not current_user.check_visibility(constants.SIDEBAR_RECENT):
if not auth.current_user().check_visibility(constants.SIDEBAR_RECENT):
abort(404)
off = request.args.get("offset") or 0
entries, __, pagination = calibre_db.fill_indexpage((int(off) / (int(config.config_books_per_page)) + 1), 0,
@ -106,7 +155,7 @@ def feed_new():
@opds.route("/opds/discover")
@requires_basic_auth_if_no_ano
def feed_discover():
if not current_user.check_visibility(constants.SIDEBAR_RANDOM):
if not auth.current_user().check_visibility(constants.SIDEBAR_RANDOM):
abort(404)
query = calibre_db.generate_linked_query(config.config_read_column, db.Books)
entries = query.filter(calibre_db.common_filters()).order_by(func.random()).limit(config.config_books_per_page)
@ -117,7 +166,7 @@ def feed_discover():
@opds.route("/opds/rated")
@requires_basic_auth_if_no_ano
def feed_best_rated():
if not current_user.check_visibility(constants.SIDEBAR_BEST_RATED):
if not auth.current_user().check_visibility(constants.SIDEBAR_BEST_RATED):
abort(404)
off = request.args.get("offset") or 0
entries, __, pagination = calibre_db.fill_indexpage((int(off) / (int(config.config_books_per_page)) + 1), 0,
@ -128,9 +177,10 @@ def feed_best_rated():
@opds.route("/opds/hot")
@requires_basic_auth_if_no_ano
#@requires_basic_auth_if_no_ano
@auth.login_required
def feed_hot():
if not current_user.check_visibility(constants.SIDEBAR_HOT):
if not auth.current_user().check_visibility(constants.SIDEBAR_HOT):
abort(404)
off = request.args.get("offset") or 0
all_books = ub.session.query(ub.Downloads, func.count(ub.Downloads.book_id)).order_by(
@ -152,17 +202,19 @@ def feed_hot():
@opds.route("/opds/author")
@requires_basic_auth_if_no_ano
#@requires_basic_auth_if_no_ano
@auth.login_required
def feed_authorindex():
if not current_user.check_visibility(constants.SIDEBAR_AUTHOR):
if not auth.current_user().check_visibility(constants.SIDEBAR_AUTHOR):
abort(404)
return render_element_index(db.Authors.sort, db.books_authors_link, 'opds.feed_letter_author')
@opds.route("/opds/author/letter/<book_id>")
@requires_basic_auth_if_no_ano
#@requires_basic_auth_if_no_ano
@auth.login_required
def feed_letter_author(book_id):
if not current_user.check_visibility(constants.SIDEBAR_AUTHOR):
if not auth.current_user().check_visibility(constants.SIDEBAR_AUTHOR):
abort(404)
off = request.args.get("offset") or 0
letter = true() if book_id == "00" else func.upper(db.Authors.sort).startswith(book_id)
@ -185,7 +237,7 @@ def feed_author(book_id):
@opds.route("/opds/publisher")
@requires_basic_auth_if_no_ano
def feed_publisherindex():
if not current_user.check_visibility(constants.SIDEBAR_PUBLISHER):
if not auth.current_user().check_visibility(constants.SIDEBAR_PUBLISHER):
abort(404)
off = request.args.get("offset") or 0
entries = calibre_db.session.query(db.Publishers)\
@ -208,7 +260,7 @@ def feed_publisher(book_id):
@opds.route("/opds/category")
@requires_basic_auth_if_no_ano
def feed_categoryindex():
if not current_user.check_visibility(constants.SIDEBAR_CATEGORY):
if not auth.current_user().check_visibility(constants.SIDEBAR_CATEGORY):
abort(404)
return render_element_index(db.Tags.name, db.books_tags_link, 'opds.feed_letter_category')
@ -216,7 +268,7 @@ def feed_categoryindex():
@opds.route("/opds/category/letter/<book_id>")
@requires_basic_auth_if_no_ano
def feed_letter_category(book_id):
if not current_user.check_visibility(constants.SIDEBAR_CATEGORY):
if not auth.current_user().check_visibility(constants.SIDEBAR_CATEGORY):
abort(404)
off = request.args.get("offset") or 0
letter = true() if book_id == "00" else func.upper(db.Tags.name).startswith(book_id)
@ -241,7 +293,7 @@ def feed_category(book_id):
@opds.route("/opds/series")
@requires_basic_auth_if_no_ano
def feed_seriesindex():
if not current_user.check_visibility(constants.SIDEBAR_SERIES):
if not auth.current_user().check_visibility(constants.SIDEBAR_SERIES):
abort(404)
return render_element_index(db.Series.sort, db.books_series_link, 'opds.feed_letter_series')
@ -249,7 +301,7 @@ def feed_seriesindex():
@opds.route("/opds/series/letter/<book_id>")
@requires_basic_auth_if_no_ano
def feed_letter_series(book_id):
if not current_user.check_visibility(constants.SIDEBAR_SERIES):
if not auth.current_user().check_visibility(constants.SIDEBAR_SERIES):
abort(404)
off = request.args.get("offset") or 0
letter = true() if book_id == "00" else func.upper(db.Series.sort).startswith(book_id)
@ -280,7 +332,7 @@ def feed_series(book_id):
@opds.route("/opds/ratings")
@requires_basic_auth_if_no_ano
def feed_ratingindex():
if not current_user.check_visibility(constants.SIDEBAR_RATING):
if not auth.current_user().check_visibility(constants.SIDEBAR_RATING):
abort(404)
off = request.args.get("offset") or 0
entries = calibre_db.session.query(db.Ratings, func.count('books_ratings_link.book').label('count'),
@ -308,7 +360,7 @@ def feed_ratings(book_id):
@opds.route("/opds/formats")
@requires_basic_auth_if_no_ano
def feed_formatindex():
if not current_user.check_visibility(constants.SIDEBAR_FORMAT):
if not auth.current_user().check_visibility(constants.SIDEBAR_FORMAT):
abort(404)
off = request.args.get("offset") or 0
entries = calibre_db.session.query(db.Data).join(db.Books)\
@ -339,14 +391,14 @@ def feed_format(book_id):
@opds.route("/opds/language/")
@requires_basic_auth_if_no_ano
def feed_languagesindex():
if not current_user.check_visibility(constants.SIDEBAR_LANGUAGE):
if not auth.current_user().check_visibility(constants.SIDEBAR_LANGUAGE):
abort(404)
off = request.args.get("offset") or 0
if current_user.filter_language() == "all":
if auth.current_user().filter_language() == "all":
languages = calibre_db.speaking_language()
else:
languages = calibre_db.session.query(db.Languages).filter(
db.Languages.lang_code == current_user.filter_language()).all()
db.Languages.lang_code == auth.current_user().filter_language()).all()
languages[0].name = isoLanguages.get_language_name(get_locale(), languages[0].lang_code)
pagination = Pagination((int(off) / (int(config.config_books_per_page)) + 1), config.config_books_per_page,
len(languages))
@ -368,11 +420,11 @@ def feed_languages(book_id):
@opds.route("/opds/shelfindex")
@requires_basic_auth_if_no_ano
def feed_shelfindex():
if not (current_user.is_authenticated or g.allow_anonymous):
if not (auth.current_user().is_authenticated or g.allow_anonymous):
abort(404)
off = request.args.get("offset") or 0
shelf = ub.session.query(ub.Shelf).filter(
or_(ub.Shelf.is_public == 1, ub.Shelf.user_id == current_user.id)).order_by(ub.Shelf.name).all()
or_(ub.Shelf.is_public == 1, ub.Shelf.user_id == auth.current_user().id)).order_by(ub.Shelf.name).all()
number = len(shelf)
pagination = Pagination((int(off) / (int(config.config_books_per_page)) + 1), config.config_books_per_page,
number)
@ -382,14 +434,14 @@ def feed_shelfindex():
@opds.route("/opds/shelf/<int:book_id>")
@requires_basic_auth_if_no_ano
def feed_shelf(book_id):
if not (current_user.is_authenticated or g.allow_anonymous):
if not (auth.current_user().is_authenticated or g.allow_anonymous):
abort(404)
off = request.args.get("offset") or 0
if current_user.is_anonymous:
if auth.current_user().is_anonymous:
shelf = ub.session.query(ub.Shelf).filter(ub.Shelf.is_public == 1,
ub.Shelf.id == book_id).first()
else:
shelf = ub.session.query(ub.Shelf).filter(or_(and_(ub.Shelf.user_id == int(current_user.id),
shelf = ub.session.query(ub.Shelf).filter(or_(and_(ub.Shelf.user_id == int(auth.current_user().id),
ub.Shelf.id == book_id),
and_(ub.Shelf.is_public == 1,
ub.Shelf.id == book_id))).first()
@ -422,7 +474,7 @@ def feed_shelf(book_id):
@opds.route("/opds/download/<book_id>/<book_format>/")
@requires_basic_auth_if_no_ano
def opds_download_link(book_id, book_format):
if not current_user.role_download():
if not auth.current_user().role_download():
return abort(403)
if "Kobo" in request.headers.get('User-Agent'):
client = "kobo"
@ -468,7 +520,7 @@ def feed_get_cover(book_id):
@opds.route("/opds/readbooks")
@requires_basic_auth_if_no_ano
def feed_read_books():
if not (current_user.check_visibility(constants.SIDEBAR_READ_AND_UNREAD) and not current_user.is_anonymous):
if not (auth.current_user().check_visibility(constants.SIDEBAR_READ_AND_UNREAD) and not auth.current_user().is_anonymous):
return abort(403)
off = request.args.get("offset") or 0
result, pagination = render_read_books(int(off) / (int(config.config_books_per_page)) + 1, True, True)
@ -478,7 +530,7 @@ def feed_read_books():
@opds.route("/opds/unreadbooks")
@requires_basic_auth_if_no_ano
def feed_unread_books():
if not (current_user.check_visibility(constants.SIDEBAR_READ_AND_UNREAD) and not current_user.is_anonymous):
if not (auth.current_user().check_visibility(constants.SIDEBAR_READ_AND_UNREAD) and not auth.current_user().is_anonymous):
return abort(403)
off = request.args.get("offset") or 0
result, pagination = render_read_books(int(off) / (int(config.config_books_per_page)) + 1, False, True)

View File

@ -19,11 +19,10 @@
from functools import wraps
from sqlalchemy.sql.expression import func
from werkzeug.security import check_password_hash
from flask_login import login_required, login_user
from flask import request, Response
from . import lm, ub, config, constants, services, logger, limiter
from . import lm, ub, config, logger, limiter
log = logger.create()
@ -36,36 +35,8 @@ def login_required_if_no_ano(func):
return decorated_view
def requires_basic_auth_if_no_ano(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or auth.type != 'basic':
if config.config_anonbrowse != 1:
user = load_user_from_reverse_proxy_header(request)
if user:
return f(*args, **kwargs)
return _authenticate()
else:
return f(*args, **kwargs)
if config.config_login_type == constants.LOGIN_LDAP and services.ldap:
login_result, error = services.ldap.bind_user(auth.username, auth.password)
if login_result:
user = _fetch_user_by_name(auth.username)
[limiter.limiter.storage.clear(k.key) for k in limiter.current_limits]
login_user(user)
return f(*args, **kwargs)
elif login_result is not None:
log.error(error)
return _authenticate()
user = _load_user_from_auth_header(auth.username, auth.password)
if not user:
return _authenticate()
return f(*args, **kwargs)
return decorated
def _load_user_from_auth_header(username, password):
'''def _load_user_from_auth_header(username, password):
limiter.check()
user = _fetch_user_by_name(username)
if bool(user and check_password_hash(str(user.password), password)) and user.name != "Guest":
@ -82,7 +53,7 @@ def _authenticate():
return Response(
'Could not verify your access level for that URL.\n'
'You have to login with proper credentials', 401,
{'WWW-Authenticate': 'Basic realm="Login Required"'})
{'WWW-Authenticate': 'Basic realm="Login Required"'})'''
def _fetch_user_by_name(username):

View File

@ -21,3 +21,4 @@ Flask-Limiter>=2.3.0,<3.6.0
regex>=2022.3.2,<2024.6.25
bleach>=6.0.0,<6.2.0
python-magic>=0.4.27,<0.5.0
flask-httpAuth>=4.4.0