1
0
mirror of https://github.com/janeczku/calibre-web synced 2024-11-25 02:57:22 +00:00

Make drive letters available in file picker

This commit is contained in:
Ozzie Isaacs 2022-10-03 15:17:17 +02:00
parent e22ecda137
commit 7eef44f73c

138
cps/admin.py Executable file → Normal file
View File

@ -26,15 +26,16 @@ import base64
import json
import operator
import time
import sys
import string
from datetime import datetime, timedelta
from datetime import time as datetime_time
from functools import wraps
from flask import Blueprint, flash, redirect, url_for, abort, request, make_response, send_from_directory, g, Response
from flask_login import login_required, current_user, logout_user, confirm_login
from flask_babel import gettext as _
from flask_babel import get_locale, format_time, format_datetime, format_timedelta
from flask_babel import get_locale, format_time, format_datetime, format_timedelta
from flask import session as flask_session
from sqlalchemy import and_
from sqlalchemy.orm.attributes import flag_modified
@ -52,27 +53,28 @@ from .services.worker import WorkerThread
from .babel import get_available_translations, get_available_locale, get_user_locale_language
from . import debug_info
log = logger.create()
feature_support = {
'ldap': bool(services.ldap),
'goodreads': bool(services.goodreads_support),
'kobo': bool(services.kobo),
'updater': constants.UPDATER_AVAILABLE,
'gmail': bool(services.gmail),
'scheduler': schedule.use_APScheduler,
'gdrive': gdrive_support
}
'ldap': bool(services.ldap),
'goodreads': bool(services.goodreads_support),
'kobo': bool(services.kobo),
'updater': constants.UPDATER_AVAILABLE,
'gmail': bool(services.gmail),
'scheduler': schedule.use_APScheduler,
'gdrive': gdrive_support
}
try:
import rarfile # pylint: disable=unused-import
feature_support['rar'] = True
except (ImportError, SyntaxError):
feature_support['rar'] = False
try:
from .oauth_bb import oauth_check, oauthblueprints
feature_support['oauth'] = True
except ImportError as err:
log.debug('Cannot import Flask-Dance, login with Oauth will not work: %s', err)
@ -80,7 +82,6 @@ except ImportError as err:
oauthblueprints = []
oauth_check = {}
admi = Blueprint('admin', __name__)
@ -159,6 +160,7 @@ def shutdown():
show_text['text'] = _(u'Unknown command')
return json.dumps(show_text), 400
@admi.route("/metadata_backup", methods=["POST"])
@login_required
@admin_required
@ -206,7 +208,7 @@ def admin():
tz = timedelta(seconds=time.timezone if (time.localtime().tm_isdst == 0) else time.altzone)
form_date = datetime.strptime(commit[:19], "%Y-%m-%dT%H:%M:%S")
if len(commit) > 19: # check if string has timezone
if len(commit) > 19: # check if string has timezone
if commit[19] == '+':
form_date -= timedelta(hours=int(commit[20:22]), minutes=int(commit[23:]))
elif commit[19] == '-':
@ -272,9 +274,9 @@ def calibreweb_alive():
@login_required
@admin_required
def view_configuration():
read_column = calibre_db.session.query(db.CustomColumns)\
read_column = calibre_db.session.query(db.CustomColumns) \
.filter(and_(db.CustomColumns.datatype == 'bool', db.CustomColumns.mark_for_delete == 0)).all()
restrict_columns = calibre_db.session.query(db.CustomColumns)\
restrict_columns = calibre_db.session.query(db.CustomColumns) \
.filter(and_(db.CustomColumns.datatype == 'text', db.CustomColumns.mark_for_delete == 0)).all()
languages = calibre_db.speaking_language()
translations = get_available_locale()
@ -293,11 +295,11 @@ def edit_user_table():
languages = calibre_db.speaking_language()
translations = get_available_locale()
all_user = ub.session.query(ub.User)
tags = calibre_db.session.query(db.Tags)\
.join(db.books_tags_link)\
.join(db.Books)\
tags = calibre_db.session.query(db.Tags) \
.join(db.books_tags_link) \
.join(db.Books) \
.filter(calibre_db.common_filters()) \
.group_by(text('books_tags_link.tag'))\
.group_by(text('books_tags_link.tag')) \
.order_by(db.Tags.name).all()
if config.config_restricted_column:
custom_values = calibre_db.session.query(db.cc_classes[config.config_restricted_column]).all()
@ -474,17 +476,17 @@ def edit_list_user(param):
elif param.endswith('role'):
value = int(vals['field_index'])
if user.name == "Guest" and value in \
[constants.ROLE_ADMIN, constants.ROLE_PASSWD, constants.ROLE_EDIT_SHELFS]:
[constants.ROLE_ADMIN, constants.ROLE_PASSWD, constants.ROLE_EDIT_SHELFS]:
raise Exception(_("Guest can't have this role"))
# check for valid value, last on checks for power of 2 value
if value > 0 and value <= constants.ROLE_VIEWER and (value & value-1 == 0 or value == 1):
if value > 0 and value <= constants.ROLE_VIEWER and (value & value - 1 == 0 or value == 1):
if vals['value'] == 'true':
user.role |= value
elif vals['value'] == 'false':
if value == constants.ROLE_ADMIN:
if not ub.session.query(ub.User).\
filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN,
ub.User.id != user.id).count():
if not ub.session.query(ub.User). \
filter(ub.User.role.op('&')(constants.ROLE_ADMIN) == constants.ROLE_ADMIN,
ub.User.id != user.id).count():
return Response(
json.dumps([{'type': "danger",
'message': _(u"No admin user remaining, can't remove admin role",
@ -499,7 +501,7 @@ def edit_list_user(param):
if user.name == "Guest" and value == constants.SIDEBAR_READ_AND_UNREAD:
raise Exception(_("Guest can't have this view"))
# check for valid value, last on checks for power of 2 value
if value > 0 and value <= constants.SIDEBAR_LIST and (value & value-1 == 0 or value == 1):
if value > 0 and value <= constants.SIDEBAR_LIST and (value & value - 1 == 0 or value == 1):
if vals['value'] == 'true':
user.sidebar_view |= value
elif vals['value'] == 'false':
@ -652,7 +654,7 @@ def edit_domain(allow):
@admin_required
def add_domain(allow):
domain_name = request.form.to_dict()['domainname'].replace('*', '%').replace('?', '_').lower()
check = ub.session.query(ub.Registration).filter(ub.Registration.domain == domain_name)\
check = ub.session.query(ub.Registration).filter(ub.Registration.domain == domain_name) \
.filter(ub.Registration.allow == allow).first()
if not check:
new_domain = ub.Registration(domain=domain_name, allow=allow)
@ -870,16 +872,16 @@ def delete_restriction(res_type, user_id):
@login_required
@admin_required
def list_restriction(res_type, user_id):
if res_type == 0: # Tags as template
restrict = [{'Element': x, 'type': _('Deny'), 'id': 'd'+str(i)}
if res_type == 0: # Tags as template
restrict = [{'Element': x, 'type': _('Deny'), 'id': 'd' + str(i)}
for i, x in enumerate(config.list_denied_tags()) if x != '']
allow = [{'Element': x, 'type': _('Allow'), 'id': 'a'+str(i)}
allow = [{'Element': x, 'type': _('Allow'), 'id': 'a' + str(i)}
for i, x in enumerate(config.list_allowed_tags()) if x != '']
json_dumps = restrict + allow
elif res_type == 1: # CustomC as template
restrict = [{'Element': x, 'type': _('Deny'), 'id': 'd'+str(i)}
restrict = [{'Element': x, 'type': _('Deny'), 'id': 'd' + str(i)}
for i, x in enumerate(config.list_denied_column_values()) if x != '']
allow = [{'Element': x, 'type': _('Allow'), 'id': 'a'+str(i)}
allow = [{'Element': x, 'type': _('Allow'), 'id': 'a' + str(i)}
for i, x in enumerate(config.list_allowed_column_values()) if x != '']
json_dumps = restrict + allow
elif res_type == 2: # Tags per user
@ -887,9 +889,9 @@ def list_restriction(res_type, user_id):
usr = ub.session.query(ub.User).filter(ub.User.id == user_id).first()
else:
usr = current_user
restrict = [{'Element': x, 'type': _('Deny'), 'id': 'd'+str(i)}
restrict = [{'Element': x, 'type': _('Deny'), 'id': 'd' + str(i)}
for i, x in enumerate(usr.list_denied_tags()) if x != '']
allow = [{'Element': x, 'type': _('Allow'), 'id': 'a'+str(i)}
allow = [{'Element': x, 'type': _('Allow'), 'id': 'a' + str(i)}
for i, x in enumerate(usr.list_allowed_tags()) if x != '']
json_dumps = restrict + allow
elif res_type == 3: # CustomC per user
@ -897,9 +899,9 @@ def list_restriction(res_type, user_id):
usr = ub.session.query(ub.User).filter(ub.User.id == user_id).first()
else:
usr = current_user
restrict = [{'Element': x, 'type': _('Deny'), 'id': 'd'+str(i)}
restrict = [{'Element': x, 'type': _('Deny'), 'id': 'd' + str(i)}
for i, x in enumerate(usr.list_denied_column_values()) if x != '']
allow = [{'Element': x, 'type': _('Allow'), 'id': 'a'+str(i)}
allow = [{'Element': x, 'type': _('Allow'), 'id': 'a' + str(i)}
for i, x in enumerate(usr.list_allowed_column_values()) if x != '']
json_dumps = restrict + allow
else:
@ -929,7 +931,7 @@ def ajax_pathchooser():
def check_valid_read_column(column):
if column != "0":
if not calibre_db.session.query(db.CustomColumns).filter(db.CustomColumns.id == column) \
.filter(and_(db.CustomColumns.datatype == 'bool', db.CustomColumns.mark_for_delete == 0)).all():
.filter(and_(db.CustomColumns.datatype == 'bool', db.CustomColumns.mark_for_delete == 0)).all():
return False
return True
@ -937,7 +939,7 @@ def check_valid_read_column(column):
def check_valid_restricted_column(column):
if column != "0":
if not calibre_db.session.query(db.CustomColumns).filter(db.CustomColumns.id == column) \
.filter(and_(db.CustomColumns.datatype == 'text', db.CustomColumns.mark_for_delete == 0)).all():
.filter(and_(db.CustomColumns.datatype == 'text', db.CustomColumns.mark_for_delete == 0)).all():
return False
return True
@ -965,7 +967,7 @@ def prepare_tags(user, action, tags_name, id_list):
raise Exception(_("Tag not found"))
new_tags_list = [x.name for x in tags]
else:
tags = calibre_db.session.query(db.cc_classes[config.config_restricted_column])\
tags = calibre_db.session.query(db.cc_classes[config.config_restricted_column]) \
.filter(db.cc_classes[config.config_restricted_column].id.in_(id_list)).all()
new_tags_list = [x.value for x in tags]
saved_tags_list = user.__dict__[tags_name].split(",") if len(user.__dict__[tags_name]) else []
@ -978,6 +980,19 @@ def prepare_tags(user, action, tags_name, id_list):
return ",".join(saved_tags_list)
def get_drives(current):
drive_letters = []
for d in string.ascii_uppercase:
if os.path.exists('{}:'.format(d)) and current[0].lower() != d.lower():
drive = "{}:\\".format(d)
data = {"name": drive, "fullpath": drive}
data["sort"] = "_" + data["fullpath"].lower()
data["type"] = "dir"
data["size"] = ""
drive_letters.append(data)
return drive_letters
def pathchooser():
browse_for = "folder"
folder_only = request.args.get('folder', False) == "true"
@ -985,40 +1000,41 @@ def pathchooser():
path = os.path.normpath(request.args.get('path', ""))
if os.path.isfile(path):
oldfile = path
old_file = path
path = os.path.dirname(path)
else:
oldfile = ""
old_file = ""
absolute = False
if os.path.isdir(path):
# if os.path.isabs(path):
cwd = os.path.realpath(path)
absolute = True
# else:
# cwd = os.path.relpath(path)
else:
cwd = os.getcwd()
cwd = os.path.normpath(os.path.realpath(cwd))
parentdir = os.path.dirname(cwd)
parent_dir = os.path.dirname(cwd)
if not absolute:
if os.path.realpath(cwd) == os.path.realpath("/"):
cwd = os.path.relpath(cwd)
else:
cwd = os.path.relpath(cwd) + os.path.sep
parentdir = os.path.relpath(parentdir) + os.path.sep
parent_dir = os.path.relpath(parent_dir) + os.path.sep
if os.path.realpath(cwd) == os.path.realpath("/"):
parentdir = ""
files = []
if os.path.realpath(cwd) == os.path.realpath("/") \
or (sys.platform == "win32" and os.path.realpath(cwd)[1:] == os.path.realpath("/")[1:]):
# we are in root
parent_dir = ""
if sys.platform == "win32":
files = get_drives(cwd)
try:
folders = os.listdir(cwd)
except Exception:
folders = []
files = []
for f in folders:
try:
data = {"name": f, "fullpath": os.path.join(cwd, f)}
@ -1051,9 +1067,9 @@ def pathchooser():
context = {
"cwd": cwd,
"files": files,
"parentdir": parentdir,
"parentdir": parent_dir,
"type": browse_for,
"oldfile": oldfile,
"oldfile": old_file,
"absolute": absolute,
}
return json.dumps(context)
@ -1091,10 +1107,10 @@ def _configuration_gdrive_helper(to_save):
if not gdrive_secrets:
return _configuration_result(_('client_secrets.json Is Not Configured For Web Application'))
gdriveutils.update_settings(
gdrive_secrets['client_id'],
gdrive_secrets['client_secret'],
gdrive_secrets['redirect_uris'][0]
)
gdrive_secrets['client_id'],
gdrive_secrets['client_secret'],
gdrive_secrets['redirect_uris'][0]
)
# always show Google Drive settings, but in case of error deny support
new_gdrive_value = (not gdrive_error) and ("config_use_google_drive" in to_save)
@ -1111,12 +1127,12 @@ def _configuration_oauth_helper(to_save):
reboot_required = False
for element in oauthblueprints:
if to_save["config_" + str(element['id']) + "_oauth_client_id"] != element['oauth_client_id'] \
or to_save["config_" + str(element['id']) + "_oauth_client_secret"] != element['oauth_client_secret']:
or to_save["config_" + str(element['id']) + "_oauth_client_secret"] != element['oauth_client_secret']:
reboot_required = True
element['oauth_client_id'] = to_save["config_" + str(element['id']) + "_oauth_client_id"]
element['oauth_client_secret'] = to_save["config_" + str(element['id']) + "_oauth_client_secret"]
if to_save["config_" + str(element['id']) + "_oauth_client_id"] \
and to_save["config_" + str(element['id']) + "_oauth_client_secret"]:
and to_save["config_" + str(element['id']) + "_oauth_client_secret"]:
active_oauths += 1
element["active"] = 1
else:
@ -1169,7 +1185,7 @@ def _configuration_ldap_helper(to_save):
if not config.config_ldap_provider_url \
or not config.config_ldap_port \
or not config.config_ldap_dn \
or not config.config_ldap_user_object:
or not config.config_ldap_user_object:
return reboot_required, _configuration_result(_('Please Enter a LDAP Provider, '
'Port, DN and User Object Identifier'))
@ -1277,7 +1293,7 @@ def update_mailsettings():
_config_int(to_save, "mail_port")
_config_int(to_save, "mail_use_ssl")
_config_string(to_save, "mail_password")
_config_int(to_save, "mail_size", lambda y: int(y)*1024*1024)
_config_int(to_save, "mail_size", lambda y: int(y) * 1024 * 1024)
config.mail_server = to_save.get('mail_server', "").strip()
config.mail_from = to_save.get('mail_from', "").strip()
config.mail_login = to_save.get('mail_login', "").strip()
@ -1317,7 +1333,7 @@ def edit_scheduledtasks():
duration_field = list()
for n in range(24):
time_field.append((n, format_time(datetime_time(hour=n), format="short",)))
time_field.append((n, format_time(datetime_time(hour=n), format="short", )))
for n in range(5, 65, 5):
t = timedelta(hours=n // 60, minutes=n % 60)
duration_field.append((n, format_timedelta(t, threshold=.97)))
@ -1568,7 +1584,7 @@ def ldap_import_create_user(user, user_data):
ub.session.add(content)
try:
ub.session.commit()
return 1, None # increase no of users
return 1, None # increase no of users
except Exception as ex:
log.warning("Failed to create LDAP user: %s - %s", user, ex)
ub.session.rollback()
@ -1680,7 +1696,7 @@ def _db_configuration_update_helper():
return _db_configuration_result('{}'.format(ex), gdrive_error)
if db_change or not db_valid or not config.db_configured \
or config.config_calibre_dir != to_save["config_calibre_dir"]:
or config.config_calibre_dir != to_save["config_calibre_dir"]:
if not os.path.exists(metadata_db) or not to_save['config_calibre_dir']:
return _db_configuration_result(_('DB Location is not Valid, Please Enter Correct Path'), gdrive_error)
else: