1
0
mirror of https://github.com/janeczku/calibre-web synced 2024-11-28 12:30:00 +00:00

Refactored admin.py

This commit is contained in:
Ozzieisaacs 2020-05-09 15:44:53 +02:00
parent 718d50a037
commit e8aedeac36

View File

@ -429,7 +429,6 @@ def delete_restriction(res_type):
return "" return ""
#@admi.route("/ajax/listrestriction/<int:type>/<int:user_id>", defaults={'user_id': '0'})
@admi.route("/ajax/listrestriction/<int:res_type>") @admi.route("/ajax/listrestriction/<int:res_type>")
@login_required @login_required
@admin_required @admin_required
@ -475,6 +474,7 @@ def list_restriction(res_type):
response.headers["Content-Type"] = "application/json; charset=utf-8" response.headers["Content-Type"] = "application/json; charset=utf-8"
return response return response
@admi.route("/config", methods=["GET", "POST"]) @admi.route("/config", methods=["GET", "POST"])
@unconfigured @unconfigured
def basic_configuration(): def basic_configuration():
@ -484,19 +484,23 @@ def basic_configuration():
return _configuration_result() return _configuration_result()
def _configuration_update_helper(): def _config_int(to_save, x):
reboot_required = False return config.set_from_dictionary(to_save, x, int)
db_change = False
to_save = request.form.to_dict()
_config_string = lambda x: config.set_from_dictionary(to_save, x, lambda y: y.strip() if y else y)
_config_int = lambda x: config.set_from_dictionary(to_save, x, int)
_config_checkbox = lambda x: config.set_from_dictionary(to_save, x, lambda y: y == "on", False)
_config_checkbox_int = lambda x: config.set_from_dictionary(to_save, x, lambda y: 1 if (y == "on") else 0, 0)
db_change |= _config_string("config_calibre_dir") def _config_checkbox(to_save, x):
return config.set_from_dictionary(to_save, x, lambda y: y == "on", False)
# Google drive setup
def _config_checkbox_int(to_save, x):
return config.set_from_dictionary(to_save, x, lambda y: 1 if (y == "on") else 0, 0)
def _config_string(to_save, x):
return config.set_from_dictionary(to_save, x, lambda y: y.strip() if y else y)
def _configuration_gdrive_helper(to_save):
if not os.path.isfile(gdriveutils.SETTINGS_YAML): if not os.path.isfile(gdriveutils.SETTINGS_YAML):
config.config_use_google_drive = False config.config_use_google_drive = False
@ -515,138 +519,162 @@ def _configuration_update_helper():
# always show google drive settings, but in case of error deny support # always show google drive settings, but in case of error deny support
config.config_use_google_drive = (not gdriveError) and ("config_use_google_drive" in to_save) config.config_use_google_drive = (not gdriveError) and ("config_use_google_drive" in to_save)
if _config_string("config_google_drive_folder"): if _config_string(to_save, "config_google_drive_folder"):
gdriveutils.deleteDatabaseOnChange() gdriveutils.deleteDatabaseOnChange()
return gdriveError
reboot_required |= _config_int("config_port") def _configuration_oauth_helper(to_save):
active_oauths = 0
for element in oauthblueprints:
if to_save["config_" + str(element['id']) + "_oauth_client_id"] != element['oauth_client_id'] \
or to_save["config_" + str(element['id']) + "_oauth_client_secret"] != element['oauth_client_secret']:
reboot_required = True
element['oauth_client_id'] = to_save["config_" + str(element['id']) + "_oauth_client_id"]
element['oauth_client_secret'] = to_save["config_" + str(element['id']) + "_oauth_client_secret"]
if to_save["config_" + str(element['id']) + "_oauth_client_id"] \
and to_save["config_" + str(element['id']) + "_oauth_client_secret"]:
active_oauths += 1
element["active"] = 1
else:
element["active"] = 0
ub.session.query(ub.OAuthProvider).filter(ub.OAuthProvider.id == element['id']).update(
{"oauth_client_id": to_save["config_" + str(element['id']) + "_oauth_client_id"],
"oauth_client_secret": to_save["config_" + str(element['id']) + "_oauth_client_secret"],
"active": element["active"]})
reboot_required |= _config_string("config_keyfile") def _configuration_logfile_helper(gdriveError, to_save):
reboot_required = False
reboot_required |= _config_int(to_save, "config_log_level")
reboot_required |= _config_string(to_save, "config_logfile")
if not logger.is_valid_logfile(config.config_logfile):
return _configuration_result(_('Logfile Location is not Valid, Please Enter Correct Path'), gdriveError)
reboot_required |= _config_checkbox_int(to_save, "config_access_log")
reboot_required |= _config_string(to_save, "config_access_logfile")
if not logger.is_valid_logfile(config.config_access_logfile):
return _configuration_result(_('Access Logfile Location is not Valid, Please Enter Correct Path'), gdriveError)
return reboot_required
def _configuration_ldap_helper(gdriveError, to_save):
reboot_required = False
reboot_required |= _config_string(to_save, "config_ldap_provider_url")
reboot_required |= _config_int(to_save, "config_ldap_port")
reboot_required |= _config_int(to_save, "config_ldap_authentication")
reboot_required |= _config_string(to_save, "config_ldap_dn")
reboot_required |= _config_string(to_save, "config_ldap_serv_username")
reboot_required |= _config_string(to_save, "config_ldap_user_object")
reboot_required |= _config_string(to_save, "config_ldap_group_object_filter")
reboot_required |= _config_string(to_save, "config_ldap_group_members_field")
reboot_required |= _config_checkbox(to_save, "config_ldap_openldap")
reboot_required |= _config_int(to_save, "config_ldap_encryption")
reboot_required |= _config_string(to_save, "config_ldap_cert_path")
_config_string(to_save, "config_ldap_group_name")
if "config_ldap_serv_password" in to_save and to_save["config_ldap_serv_password"] != "":
reboot_required |= 1
config.set_from_dictionary(to_save, "config_ldap_serv_password", base64.b64encode, encode='UTF-8')
config.save()
if not config.config_ldap_provider_url \
or not config.config_ldap_port \
or not config.config_ldap_dn \
or not config.config_ldap_user_object:
return _configuration_result(_('Please Enter a LDAP Provider, '
'Port, DN and User Object Identifier'), gdriveError)
if config.config_ldap_authentication > constants.LDAP_AUTH_ANONYMOUS:
if config.config_ldap_authentication > constants.LDAP_AUTH_UNAUTHENTICATE:
if not config.config_ldap_serv_username or not bool(config.config_ldap_serv_password):
return _configuration_result('Please Enter a LDAP Service Account and Password', gdriveError)
else:
if not config.config_ldap_serv_username:
return _configuration_result('Please Enter a LDAP Service Account', gdriveError)
if config.config_ldap_group_object_filter:
if config.config_ldap_group_object_filter.count("%s") != 1:
return _configuration_result(_('LDAP Group Object Filter Needs to Have One "%s" Format Identifier'),
gdriveError)
if config.config_ldap_group_object_filter.count("(") != config.config_ldap_group_object_filter.count(")"):
return _configuration_result(_('LDAP Group Object Filter Has Unmatched Parenthesis'),
gdriveError)
if config.config_ldap_user_object.count("%s") != 1:
return _configuration_result(_('LDAP User Object Filter needs to Have One "%s" Format Identifier'),
gdriveError)
if config.config_ldap_user_object.count("(") != config.config_ldap_user_object.count(")"):
return _configuration_result(_('LDAP User Object Filter Has Unmatched Parenthesis'),
gdriveError)
if config.config_ldap_cert_path and not os.path.isdir(config.config_ldap_cert_path):
return _configuration_result(_('LDAP Certificate Location is not Valid, Please Enter Correct Path'),
gdriveError)
return reboot_required
def _configuration_update_helper():
reboot_required = False
db_change = False
to_save = request.form.to_dict()
db_change |= _config_string(to_save, "config_calibre_dir")
# Google drive setup
gdriveError = _configuration_gdrive_helper(to_save)
reboot_required |= _config_int(to_save, "config_port")
reboot_required |= _config_string(to_save, "config_keyfile")
if config.config_keyfile and not os.path.isfile(config.config_keyfile): if config.config_keyfile and not os.path.isfile(config.config_keyfile):
return _configuration_result(_('Keyfile Location is not Valid, Please Enter Correct Path'), gdriveError) return _configuration_result(_('Keyfile Location is not Valid, Please Enter Correct Path'), gdriveError)
reboot_required |= _config_string("config_certfile") reboot_required |= _config_string(to_save, "config_certfile")
if config.config_certfile and not os.path.isfile(config.config_certfile): if config.config_certfile and not os.path.isfile(config.config_certfile):
return _configuration_result(_('Certfile Location is not Valid, Please Enter Correct Path'), gdriveError) return _configuration_result(_('Certfile Location is not Valid, Please Enter Correct Path'), gdriveError)
_config_checkbox_int("config_uploading") _config_checkbox_int(to_save, "config_uploading")
_config_checkbox_int("config_anonbrowse") _config_checkbox_int(to_save, "config_anonbrowse")
_config_checkbox_int("config_public_reg") _config_checkbox_int(to_save, "config_public_reg")
reboot_required |= _config_checkbox_int("config_kobo_sync") reboot_required |= _config_checkbox_int(to_save, "config_kobo_sync")
_config_checkbox_int("config_kobo_proxy") _config_checkbox_int(to_save, "config_kobo_proxy")
_config_string("config_calibre") _config_string(to_save, "config_calibre")
_config_string("config_converterpath") _config_string(to_save, "config_converterpath")
_config_string("config_kepubifypath") _config_string(to_save, "config_kepubifypath")
reboot_required |= _config_int("config_login_type") reboot_required |= _config_int(to_save, "config_login_type")
#LDAP configurator, #LDAP configurator,
if config.config_login_type == constants.LOGIN_LDAP: if config.config_login_type == constants.LOGIN_LDAP:
reboot_required |= _config_string("config_ldap_provider_url") reboot_required |= _configuration_ldap_helper(to_save, gdriveError)
reboot_required |= _config_int("config_ldap_port")
reboot_required |= _config_int("config_ldap_authentication")
reboot_required |= _config_string("config_ldap_dn")
reboot_required |= _config_string("config_ldap_serv_username")
reboot_required |= _config_string("config_ldap_user_object")
reboot_required |= _config_string("config_ldap_group_object_filter")
reboot_required |= _config_string("config_ldap_group_members_field")
reboot_required |= _config_checkbox("config_ldap_openldap")
reboot_required |= _config_int("config_ldap_encryption")
reboot_required |= _config_string("config_ldap_cert_path")
_config_string("config_ldap_group_name")
if "config_ldap_serv_password" in to_save and to_save["config_ldap_serv_password"] != "":
reboot_required |= 1
config.set_from_dictionary(to_save, "config_ldap_serv_password", base64.b64encode, encode='UTF-8')
config.save()
if not config.config_ldap_provider_url \
or not config.config_ldap_port \
or not config.config_ldap_dn \
or not config.config_ldap_user_object:
return _configuration_result(_('Please Enter a LDAP Provider, '
'Port, DN and User Object Identifier'), gdriveError)
if config.config_ldap_authentication > constants.LDAP_AUTH_ANONYMOUS:
if config.config_ldap_authentication > constants.LDAP_AUTH_UNAUTHENTICATE:
if not config.config_ldap_serv_username or not bool(config.config_ldap_serv_password):
return _configuration_result('Please Enter a LDAP Service Account and Password', gdriveError)
else:
if not config.config_ldap_serv_username:
return _configuration_result('Please Enter a LDAP Service Account', gdriveError)
if config.config_ldap_group_object_filter:
if config.config_ldap_group_object_filter.count("%s") != 1:
return _configuration_result(_('LDAP Group Object Filter Needs to Have One "%s" Format Identifier'),
gdriveError)
if config.config_ldap_group_object_filter.count("(") != config.config_ldap_group_object_filter.count(")"):
return _configuration_result(_('LDAP Group Object Filter Has Unmatched Parenthesis'),
gdriveError)
if config.config_ldap_user_object.count("%s") != 1:
return _configuration_result(_('LDAP User Object Filter needs to Have One "%s" Format Identifier'),
gdriveError)
if config.config_ldap_user_object.count("(") != config.config_ldap_user_object.count(")"):
return _configuration_result(_('LDAP User Object Filter Has Unmatched Parenthesis'),
gdriveError)
if config.config_ldap_cert_path and not os.path.isdir(config.config_ldap_cert_path):
return _configuration_result(_('LDAP Certificate Location is not Valid, Please Enter Correct Path'),
gdriveError)
# Remote login configuration # Remote login configuration
_config_checkbox("config_remote_login") _config_checkbox(to_save, "config_remote_login")
if not config.config_remote_login: if not config.config_remote_login:
ub.session.query(ub.RemoteAuthToken).filter(ub.RemoteAuthToken.token_type==0).delete() ub.session.query(ub.RemoteAuthToken).filter(ub.RemoteAuthToken.token_type==0).delete()
# Goodreads configuration # Goodreads configuration
_config_checkbox("config_use_goodreads") _config_checkbox(to_save, "config_use_goodreads")
_config_string("config_goodreads_api_key") _config_string(to_save, "config_goodreads_api_key")
_config_string("config_goodreads_api_secret") _config_string(to_save, "config_goodreads_api_secret")
if services.goodreads_support: if services.goodreads_support:
services.goodreads_support.connect(config.config_goodreads_api_key, services.goodreads_support.connect(config.config_goodreads_api_key,
config.config_goodreads_api_secret, config.config_goodreads_api_secret,
config.config_use_goodreads) config.config_use_goodreads)
_config_int("config_updatechannel") _config_int(to_save, "config_updatechannel")
# Reverse proxy login configuration # Reverse proxy login configuration
_config_checkbox("config_allow_reverse_proxy_header_login") _config_checkbox(to_save, "config_allow_reverse_proxy_header_login")
_config_string("config_reverse_proxy_login_header_name") _config_string(to_save, "config_reverse_proxy_login_header_name")
# GitHub OAuth configuration # OAuth configuration
if config.config_login_type == constants.LOGIN_OAUTH: if config.config_login_type == constants.LOGIN_OAUTH:
active_oauths = 0 _configuration_oauth_helper(to_save)
for element in oauthblueprints:
if to_save["config_" + str(element['id']) + "_oauth_client_id"] != element['oauth_client_id'] \
or to_save["config_" + str(element['id']) + "_oauth_client_secret"] != element['oauth_client_secret']:
reboot_required = True
element['oauth_client_id'] = to_save["config_" + str(element['id']) + "_oauth_client_id"]
element['oauth_client_secret'] = to_save["config_" + str(element['id']) + "_oauth_client_secret"]
if to_save["config_"+str(element['id'])+"_oauth_client_id"] \
and to_save["config_"+str(element['id'])+"_oauth_client_secret"]:
active_oauths += 1
element["active"] = 1
else:
element["active"] = 0
ub.session.query(ub.OAuthProvider).filter(ub.OAuthProvider.id == element['id']).update(
{"oauth_client_id":to_save["config_"+str(element['id'])+"_oauth_client_id"],
"oauth_client_secret":to_save["config_"+str(element['id'])+"_oauth_client_secret"],
"active":element["active"]})
reboot_required |= _config_int("config_log_level")
reboot_required |= _config_string("config_logfile")
if not logger.is_valid_logfile(config.config_logfile):
return _configuration_result(_('Logfile Location is not Valid, Please Enter Correct Path'), gdriveError)
reboot_required |= _config_checkbox_int("config_access_log")
reboot_required |= _config_string("config_access_logfile")
if not logger.is_valid_logfile(config.config_access_logfile):
return _configuration_result(_('Access Logfile Location is not Valid, Please Enter Correct Path'), gdriveError)
reboot_required |= _configuration_logfile_helper(to_save, gdriveError)
# Rarfile Content configuration # Rarfile Content configuration
_config_string("config_rarfile_location") _config_string(to_save, "config_rarfile_location")
unrar_status = helper.check_unrar(config.config_rarfile_location) unrar_status = helper.check_unrar(config.config_rarfile_location)
if unrar_status: if unrar_status:
return _configuration_result(unrar_status, gdriveError) return _configuration_result(unrar_status, gdriveError)
@ -660,7 +688,6 @@ def _configuration_update_helper():
return _configuration_result('%s' % e, gdriveError) return _configuration_result('%s' % e, gdriveError)
if db_change: if db_change:
# reload(db)
if not db.setup_db(config): if not db.setup_db(config):
return _configuration_result(_('DB Location is not Valid, Please Enter Correct Path'), gdriveError) return _configuration_result(_('DB Location is not Valid, Please Enter Correct Path'), gdriveError)
@ -698,6 +725,144 @@ def _configuration_result(error_flash=None, gdriveError=None):
title=_(u"Basic Configuration"), page="config") title=_(u"Basic Configuration"), page="config")
def _handle_new_user(to_save, content,languages, translations, kobo_support):
content.default_language = to_save["default_language"]
# content.mature_content = "Show_mature_content" in to_save
content.locale = to_save.get("locale", content.locale)
content.sidebar_view = sum(int(key[5:]) for key in to_save if key.startswith('show_'))
if "show_detail_random" in to_save:
content.sidebar_view |= constants.DETAIL_RANDOM
content.role = constants.selected_roles(to_save)
if not to_save["nickname"] or not to_save["email"] or not to_save["password"]:
flash(_(u"Please fill out all fields!"), category="error")
return render_title_template("user_edit.html", new_user=1, content=content, translations=translations,
registered_oauth=oauth_check, kobo_support=kobo_support,
title=_(u"Add new user"))
content.password = generate_password_hash(to_save["password"])
existing_user = ub.session.query(ub.User).filter(func.lower(ub.User.nickname) == to_save["nickname"].lower()) \
.first()
existing_email = ub.session.query(ub.User).filter(ub.User.email == to_save["email"].lower()) \
.first()
if not existing_user and not existing_email:
content.nickname = to_save["nickname"]
if config.config_public_reg and not check_valid_domain(to_save["email"]):
flash(_(u"E-mail is not from valid domain"), category="error")
return render_title_template("user_edit.html", new_user=1, content=content, translations=translations,
registered_oauth=oauth_check, kobo_support=kobo_support,
title=_(u"Add new user"))
else:
content.email = to_save["email"]
else:
flash(_(u"Found an existing account for this e-mail address or nickname."), category="error")
return render_title_template("user_edit.html", new_user=1, content=content, translations=translations,
languages=languages, title=_(u"Add new user"), page="newuser",
kobo_support=kobo_support, registered_oauth=oauth_check)
try:
content.allowed_tags = config.config_allowed_tags
content.denied_tags = config.config_denied_tags
content.allowed_column_value = config.config_allowed_column_value
content.denied_column_value = config.config_denied_column_value
ub.session.add(content)
ub.session.commit()
flash(_(u"User '%(user)s' created", user=content.nickname), category="success")
return redirect(url_for('admin.admin'))
except IntegrityError:
ub.session.rollback()
flash(_(u"Found an existing account for this e-mail address or nickname."), category="error")
def _handle_edit_user(to_save, content,languages, translations, kobo_support, downloads):
if "delete" in to_save:
if ub.session.query(ub.User).filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN,
ub.User.id != content.id).count():
ub.session.query(ub.User).filter(ub.User.id == content.id).delete()
ub.session.commit()
flash(_(u"User '%(nick)s' deleted", nick=content.nickname), category="success")
return redirect(url_for('admin.admin'))
else:
flash(_(u"No admin user remaining, can't delete user", nick=content.nickname), category="error")
return redirect(url_for('admin.admin'))
else:
if not ub.session.query(ub.User).filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN,
ub.User.id != content.id).count() and \
not 'admin_role' in to_save:
flash(_(u"No admin user remaining, can't remove admin role", nick=content.nickname), category="error")
return redirect(url_for('admin.admin'))
if "password" in to_save and to_save["password"]:
content.password = generate_password_hash(to_save["password"])
anonymous = content.is_anonymous
content.role = constants.selected_roles(to_save)
if anonymous:
content.role |= constants.ROLE_ANONYMOUS
else:
content.role &= ~constants.ROLE_ANONYMOUS
val = [int(k[5:]) for k in to_save if k.startswith('show_')]
sidebar = ub.get_sidebar_config()
for element in sidebar:
value = element['visibility']
if value in val and not content.check_visibility(value):
content.sidebar_view |= value
elif not value in val and content.check_visibility(value):
content.sidebar_view &= ~value
if "Show_detail_random" in to_save:
content.sidebar_view |= constants.DETAIL_RANDOM
else:
content.sidebar_view &= ~constants.DETAIL_RANDOM
if "default_language" in to_save:
content.default_language = to_save["default_language"]
if "locale" in to_save and to_save["locale"]:
content.locale = to_save["locale"]
if to_save["email"] and to_save["email"] != content.email:
existing_email = ub.session.query(ub.User).filter(ub.User.email == to_save["email"].lower()) \
.first()
if not existing_email:
content.email = to_save["email"]
else:
flash(_(u"Found an existing account for this e-mail address."), category="error")
return render_title_template("user_edit.html",
translations=translations,
languages=languages,
mail_configured=config.get_mail_server_configured(),
kobo_support=kobo_support,
new_user=0,
content=content,
downloads=downloads,
registered_oauth=oauth_check,
title=_(u"Edit User %(nick)s", nick=content.nickname), page="edituser")
if "nickname" in to_save and to_save["nickname"] != content.nickname:
# Query User nickname, if not existing, change
if not ub.session.query(ub.User).filter(ub.User.nickname == to_save["nickname"]).scalar():
content.nickname = to_save["nickname"]
else:
flash(_(u"This username is already taken"), category="error")
return render_title_template("user_edit.html",
translations=translations,
languages=languages,
mail_configured=config.get_mail_server_configured(),
new_user=0, content=content,
downloads=downloads,
registered_oauth=oauth_check,
kobo_support=kobo_support,
title=_(u"Edit User %(nick)s", nick=content.nickname),
page="edituser")
if "kindle_mail" in to_save and to_save["kindle_mail"] != content.kindle_mail:
content.kindle_mail = to_save["kindle_mail"]
try:
ub.session.commit()
flash(_(u"User '%(nick)s' updated", nick=content.nickname), category="success")
except IntegrityError:
ub.session.rollback()
flash(_(u"An unknown error occured."), category="error")
@admi.route("/admin/user/new", methods=["GET", "POST"]) @admi.route("/admin/user/new", methods=["GET", "POST"])
@login_required @login_required
@admin_required @admin_required
@ -708,52 +873,7 @@ def new_user():
kobo_support = feature_support['kobo'] and config.config_kobo_sync kobo_support = feature_support['kobo'] and config.config_kobo_sync
if request.method == "POST": if request.method == "POST":
to_save = request.form.to_dict() to_save = request.form.to_dict()
content.default_language = to_save["default_language"] _handle_new_user(to_save, content, languages, translations, kobo_support)
# content.mature_content = "Show_mature_content" in to_save
content.locale = to_save.get("locale", content.locale)
content.sidebar_view = sum(int(key[5:]) for key in to_save if key.startswith('show_'))
if "show_detail_random" in to_save:
content.sidebar_view |= constants.DETAIL_RANDOM
content.role = constants.selected_roles(to_save)
if not to_save["nickname"] or not to_save["email"] or not to_save["password"]:
flash(_(u"Please fill out all fields!"), category="error")
return render_title_template("user_edit.html", new_user=1, content=content, translations=translations,
registered_oauth=oauth_check, kobo_support=kobo_support,
title=_(u"Add new user"))
content.password = generate_password_hash(to_save["password"])
existing_user = ub.session.query(ub.User).filter(func.lower(ub.User.nickname) == to_save["nickname"].lower())\
.first()
existing_email = ub.session.query(ub.User).filter(ub.User.email == to_save["email"].lower())\
.first()
if not existing_user and not existing_email:
content.nickname = to_save["nickname"]
if config.config_public_reg and not check_valid_domain(to_save["email"]):
flash(_(u"E-mail is not from valid domain"), category="error")
return render_title_template("user_edit.html", new_user=1, content=content, translations=translations,
registered_oauth=oauth_check, kobo_support=kobo_support,
title=_(u"Add new user"))
else:
content.email = to_save["email"]
else:
flash(_(u"Found an existing account for this e-mail address or nickname."), category="error")
return render_title_template("user_edit.html", new_user=1, content=content, translations=translations,
languages=languages, title=_(u"Add new user"), page="newuser",
kobo_support=kobo_support, registered_oauth=oauth_check)
try:
content.allowed_tags = config.config_allowed_tags
content.denied_tags = config.config_denied_tags
content.allowed_column_value = config.config_allowed_column_value
content.denied_column_value = config.config_denied_column_value
ub.session.add(content)
ub.session.commit()
flash(_(u"User '%(user)s' created", user=content.nickname), category="success")
return redirect(url_for('admin.admin'))
except IntegrityError:
ub.session.rollback()
flash(_(u"Found an existing account for this e-mail address or nickname."), category="error")
else: else:
content.role = config.config_default_role content.role = config.config_default_role
content.sidebar_view = config.config_default_show content.sidebar_view = config.config_default_show
@ -778,15 +898,12 @@ def update_mailsettings():
to_save = request.form.to_dict() to_save = request.form.to_dict()
log.debug("update_mailsettings %r", to_save) log.debug("update_mailsettings %r", to_save)
_config_string = lambda x: config.set_from_dictionary(to_save, x, lambda y: y.strip() if y else y) _config_string(to_save, "mail_server")
_config_int = lambda x: config.set_from_dictionary(to_save, x, int) _config_int(to_save, "mail_port")
_config_int(to_save, "mail_use_ssl")
_config_string("mail_server") _config_string(to_save, "mail_login")
_config_int("mail_port") _config_string(to_save, "mail_password")
_config_int("mail_use_ssl") _config_string(to_save, "mail_from")
_config_string("mail_login")
_config_string("mail_password")
_config_string("mail_from")
config.save() config.save()
if to_save.get("test"): if to_save.get("test"):
@ -823,96 +940,9 @@ def edit_user(user_id):
downloads.append(downloadbook) downloads.append(downloadbook)
else: else:
ub.delete_download(book.book_id) ub.delete_download(book.book_id)
# ub.session.query(ub.Downloads).filter(book.book_id == ub.Downloads.book_id).delete()
# ub.session.commit()
if request.method == "POST": if request.method == "POST":
to_save = request.form.to_dict() to_save = request.form.to_dict()
if "delete" in to_save: _handle_edit_user(to_save, content, languages, translations, kobo_support, downloads)
if ub.session.query(ub.User).filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN,
ub.User.id != content.id).count():
ub.session.query(ub.User).filter(ub.User.id == content.id).delete()
ub.session.commit()
flash(_(u"User '%(nick)s' deleted", nick=content.nickname), category="success")
return redirect(url_for('admin.admin'))
else:
flash(_(u"No admin user remaining, can't delete user", nick=content.nickname), category="error")
return redirect(url_for('admin.admin'))
else:
if not ub.session.query(ub.User).filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN,
ub.User.id != content.id).count() and \
not 'admin_role' in to_save:
flash(_(u"No admin user remaining, can't remove admin role", nick=content.nickname), category="error")
return redirect(url_for('admin.admin'))
if "password" in to_save and to_save["password"]:
content.password = generate_password_hash(to_save["password"])
anonymous = content.is_anonymous
content.role = constants.selected_roles(to_save)
if anonymous:
content.role |= constants.ROLE_ANONYMOUS
else:
content.role &= ~constants.ROLE_ANONYMOUS
val = [int(k[5:]) for k in to_save if k.startswith('show_')]
sidebar = ub.get_sidebar_config()
for element in sidebar:
value = element['visibility']
if value in val and not content.check_visibility(value):
content.sidebar_view |= value
elif not value in val and content.check_visibility(value):
content.sidebar_view &= ~value
if "Show_detail_random" in to_save:
content.sidebar_view |= constants.DETAIL_RANDOM
else:
content.sidebar_view &= ~constants.DETAIL_RANDOM
if "default_language" in to_save:
content.default_language = to_save["default_language"]
if "locale" in to_save and to_save["locale"]:
content.locale = to_save["locale"]
if to_save["email"] and to_save["email"] != content.email:
existing_email = ub.session.query(ub.User).filter(ub.User.email == to_save["email"].lower()) \
.first()
if not existing_email:
content.email = to_save["email"]
else:
flash(_(u"Found an existing account for this e-mail address."), category="error")
return render_title_template("user_edit.html",
translations=translations,
languages=languages,
mail_configured = config.get_mail_server_configured(),
kobo_support=kobo_support,
new_user=0,
content=content,
downloads=downloads,
registered_oauth=oauth_check,
title=_(u"Edit User %(nick)s", nick=content.nickname), page="edituser")
if "nickname" in to_save and to_save["nickname"] != content.nickname:
# Query User nickname, if not existing, change
if not ub.session.query(ub.User).filter(ub.User.nickname == to_save["nickname"]).scalar():
content.nickname = to_save["nickname"]
else:
flash(_(u"This username is already taken"), category="error")
return render_title_template("user_edit.html",
translations=translations,
languages=languages,
mail_configured=config.get_mail_server_configured(),
new_user=0, content=content,
downloads=downloads,
registered_oauth=oauth_check,
kobo_support=kobo_support,
title=_(u"Edit User %(nick)s", nick=content.nickname),
page="edituser")
if "kindle_mail" in to_save and to_save["kindle_mail"] != content.kindle_mail:
content.kindle_mail = to_save["kindle_mail"]
try:
ub.session.commit()
flash(_(u"User '%(nick)s' updated", nick=content.nickname), category="success")
except IntegrityError:
ub.session.rollback()
flash(_(u"An unknown error occured."), category="error")
return render_title_template("user_edit.html", return render_title_template("user_edit.html",
translations=translations, translations=translations,
languages=languages, languages=languages,