From ce48e06c45764eaecbd638e8086f20b2dc804fc2 Mon Sep 17 00:00:00 2001 From: Ozzie Isaacs Date: Sun, 12 Feb 2023 13:10:00 +0100 Subject: [PATCH] Improved limiter --- cps.py | 2 +- cps/kobo_auth.py | 4 +- cps/server.py | 2 +- cps/web.py | 99 +++++++++++++++++++++++++++--------------------- 4 files changed, 60 insertions(+), 47 deletions(-) diff --git a/cps.py b/cps.py index e4f9c520..f97d0f5a 100755 --- a/cps.py +++ b/cps.py @@ -21,7 +21,7 @@ import os import sys -# Add local path to sys.path so we can import cps +# Add local path to sys.path, so we can import cps path = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, path) diff --git a/cps/kobo_auth.py b/cps/kobo_auth.py index efbb3080..3736e4e1 100644 --- a/cps/kobo_auth.py +++ b/cps/kobo_auth.py @@ -166,6 +166,6 @@ def requires_kobo_auth(f): login_user(user) [limiter.limiter.storage.clear(k.key) for k in limiter.current_limits] return f(*args, **kwargs) - log.debug("Received Kobo request without a recognizable auth token.") - return abort(401) + log.debug("Received Kobo request without a recognizable auth token.") + return abort(401) return inner diff --git a/cps/server.py b/cps/server.py index 6b5ac761..d737bb1b 100644 --- a/cps/server.py +++ b/cps/server.py @@ -22,7 +22,6 @@ import errno import signal import socket import subprocess # nosec -from .services.background_scheduler import BackgroundScheduler try: from gevent.pywsgi import WSGIServer @@ -268,6 +267,7 @@ class WebServer(object): @staticmethod def shutdown_scheduler(): + from .services.background_scheduler import BackgroundScheduler scheduler = BackgroundScheduler() if scheduler: scheduler.scheduler.shutdown() diff --git a/cps/web.py b/cps/web.py index bf366f24..b2c99bbe 100644 --- a/cps/web.py +++ b/cps/web.py @@ -28,10 +28,10 @@ from flask import Blueprint, jsonify from flask import request, redirect, send_from_directory, make_response, flash, abort, url_for from flask import session as flask_session from flask_babel import gettext as _ -from flask_babel import lazy_gettext as N_ from flask_babel import get_locale from flask_login import login_user, logout_user, login_required, current_user from flask_limiter import RateLimitExceeded +from flask_limiter.util import get_remote_address from sqlalchemy.exc import IntegrityError, InvalidRequestError, OperationalError from sqlalchemy.sql.expression import text, func, false, not_, and_, or_ from sqlalchemy.orm.attributes import flag_modified @@ -1223,8 +1223,62 @@ def send_to_ereader(book_id, book_format, convert): # ################################### Login Logout ################################################################## +@web.route('/register', methods=['POST']) +@limiter.limit("40/day", key_func=get_remote_address) +@limiter.limit("3/minute", key_func=get_remote_address) +def register_post(): + if not config.config_public_reg: + abort(404) + to_save = request.form.to_dict() + try: + limiter.check() + except RateLimitExceeded: + flash(_(u"Please wait one minute to register next user"), category="error") + return render_title_template('register.html', config=config, title=_("Register"), page="register") + if current_user is not None and current_user.is_authenticated: + return redirect(url_for('web.index')) + if not config.get_mail_server_configured(): + flash(_("Oops! Email server is not configured, please contact your administrator."), category="error") + return render_title_template('register.html', title=_("Register"), page="register") + nickname = to_save.get("email", "").strip() if config.config_register_email else to_save.get('name') + if not nickname or not to_save.get("email"): + flash(_("Oops! Please complete all fields."), category="error") + return render_title_template('register.html', title=_("Register"), page="register") + try: + nickname = check_username(nickname) + email = check_email(to_save.get("email", "")) + except Exception as ex: + flash(str(ex), category="error") + return render_title_template('register.html', title=_("Register"), page="register") -@web.route('/register', methods=['GET', 'POST']) + content = ub.User() + if check_valid_domain(email): + content.name = nickname + content.email = email + password = generate_random_password() + content.password = generate_password_hash(password) + content.role = config.config_default_role + content.locale = config.config_default_locale + content.sidebar_view = config.config_default_show + try: + ub.session.add(content) + ub.session.commit() + if feature_support['oauth']: + register_user_with_oauth(content) + send_registration_mail(to_save.get("email", "").strip(), nickname, password) + except Exception: + ub.session.rollback() + flash(_("Oops! An unknown error occurred. Please try again later."), category="error") + return render_title_template('register.html', title=_("Register"), page="register") + else: + flash(_("Oops! Your Email is not allowed."), category="error") + log.warning('Registering failed for user "{}" Email: {}'.format(nickname, to_save.get("email",""))) + return render_title_template('register.html', title=_("Register"), page="register") + flash(_("Success! Confirmation Email has been sent."), category="success") + return redirect(url_for('web.login')) + + +@web.route('/register', methods=['GET']) def register(): if not config.config_public_reg: abort(404) @@ -1233,47 +1287,6 @@ def register(): if not config.get_mail_server_configured(): flash(_("Oops! Email server is not configured, please contact your administrator."), category="error") return render_title_template('register.html', title=_("Register"), page="register") - - if request.method == "POST": - to_save = request.form.to_dict() - nickname = to_save.get("email", "").strip() if config.config_register_email else to_save.get('name') - if not nickname or not to_save.get("email"): - flash(_("Oops! Please complete all fields."), category="error") - return render_title_template('register.html', title=_("Register"), page="register") - try: - nickname = check_username(nickname) - email = check_email(to_save.get("email", "")) - except Exception as ex: - flash(str(ex), category="error") - return render_title_template('register.html', title=_("Register"), page="register") - - content = ub.User() - if check_valid_domain(email): - content.name = nickname - content.email = email - password = generate_random_password() - content.password = generate_password_hash(password) - content.role = config.config_default_role - content.locale = config.config_default_locale - content.sidebar_view = config.config_default_show - try: - ub.session.add(content) - ub.session.commit() - if feature_support['oauth']: - register_user_with_oauth(content) - send_registration_mail(to_save.get("email", "").strip(), nickname, password) - except Exception: - ub.session.rollback() - flash(_("Oops! An unknown error occurred. Please try again later."), category="error") - return render_title_template('register.html', title=_("Register"), page="register") - else: - flash(_("Oops! Your Email is not allowed."), category="error") - log.warning('Registering failed for user "{}" Email: {}'.format(nickname, - to_save.get("email",""))) - return render_title_template('register.html', title=_("Register"), page="register") - flash(_("Success! Confirmation Email has been sent."), category="success") - return redirect(url_for('web.login')) - if feature_support['oauth']: register_user_with_oauth() return render_title_template('register.html', config=config, title=_("Register"), page="register")