1
0
mirror of https://github.com/janeczku/calibre-web synced 2025-10-16 16:17:39 +00:00

Code cosmetics

This commit is contained in:
Ozzieisaacs
2020-04-19 19:08:58 +02:00
parent 7bb5afa585
commit 24c743d23d
15 changed files with 262 additions and 192 deletions

View File

@@ -96,7 +96,7 @@ def convert_book_format(book_id, calibrepath, old_book_format, new_book_format,
# read settings and append converter task to queue
if kindle_mail:
settings = config.get_mail_settings()
settings['subject'] = _('Send to Kindle') # pretranslate Subject for e-mail
settings['subject'] = _('Send to Kindle') # pretranslate Subject for e-mail
settings['body'] = _(u'This e-mail has been sent via Calibre-Web.')
# text = _(u"%(format)s: %(book)s", format=new_book_format, book=book.title)
else:
@@ -108,7 +108,7 @@ def convert_book_format(book_id, calibrepath, old_book_format, new_book_format,
return None
else:
error_message = _(u"%(format)s not found: %(fn)s",
format=old_book_format, fn=data.name + "." + old_book_format.lower())
format=old_book_format, fn=data.name + "." + old_book_format.lower())
return error_message
@@ -141,36 +141,52 @@ def check_send_to_kindle(entry):
returns all available book formats for sending to Kindle
"""
if len(entry.data):
bookformats=list()
bookformats = list()
if config.config_ebookconverter == 0:
# no converter - only for mobi and pdf formats
for ele in iter(entry.data):
if 'MOBI' in ele.format:
bookformats.append({'format':'Mobi','convert':0,'text':_('Send %(format)s to Kindle',format='Mobi')})
bookformats.append({'format': 'Mobi',
'convert': 0,
'text': _('Send %(format)s to Kindle', format='Mobi')})
if 'PDF' in ele.format:
bookformats.append({'format':'Pdf','convert':0,'text':_('Send %(format)s to Kindle',format='Pdf')})
bookformats.append({'format': 'Pdf',
'convert': 0,
'text': _('Send %(format)s to Kindle', format='Pdf')})
if 'AZW' in ele.format:
bookformats.append({'format':'Azw','convert':0,'text':_('Send %(format)s to Kindle',format='Azw')})
'''if 'AZW3' in ele.format:
bookformats.append({'format':'Azw3','convert':0,'text':_('Send %(format)s to Kindle',format='Azw3')})'''
bookformats.append({'format': 'Azw',
'convert': 0,
'text': _('Send %(format)s to Kindle', format='Azw')})
else:
formats = list()
for ele in iter(entry.data):
formats.append(ele.format)
if 'MOBI' in formats:
bookformats.append({'format': 'Mobi','convert':0,'text':_('Send %(format)s to Kindle',format='Mobi')})
bookformats.append({'format': 'Mobi',
'convert': 0,
'text': _('Send %(format)s to Kindle', format='Mobi')})
if 'AZW' in formats:
bookformats.append({'format': 'Azw','convert':0,'text':_('Send %(format)s to Kindle',format='Azw')})
bookformats.append({'format': 'Azw',
'convert': 0,
'text': _('Send %(format)s to Kindle', format='Azw')})
if 'PDF' in formats:
bookformats.append({'format': 'Pdf','convert':0,'text':_('Send %(format)s to Kindle',format='Pdf')})
bookformats.append({'format': 'Pdf',
'convert': 0,
'text': _('Send %(format)s to Kindle', format='Pdf')})
if config.config_ebookconverter >= 1:
if 'EPUB' in formats and not 'MOBI' in formats:
bookformats.append({'format': 'Mobi','convert':1,
'text':_('Convert %(orig)s to %(format)s and send to Kindle',orig='Epub',format='Mobi')})
bookformats.append({'format': 'Mobi',
'convert':1,
'text': _('Convert %(orig)s to %(format)s and send to Kindle',
orig='Epub',
format='Mobi')})
if config.config_ebookconverter == 2:
if 'AZW3' in formats and not 'MOBI' in formats:
bookformats.append({'format': 'Mobi','convert':2,
'text':_('Convert %(orig)s to %(format)s and send to Kindle',orig='Azw3',format='Mobi')})
bookformats.append({'format': 'Mobi',
'convert': 2,
'text': _('Convert %(orig)s to %(format)s and send to Kindle',
orig='Azw3',
format='Mobi')})
return bookformats
else:
log.error(u'Cannot find book entry %d', entry.id)
@@ -204,7 +220,6 @@ def send_mail(book_id, book_format, convert, kindle_mail, calibrepath, user_id):
# returns None if success, otherwise errormessage
return convert_book_format(book_id, calibrepath, u'azw3', book_format.lower(), user_id, kindle_mail)
for entry in iter(book.data):
if entry.format.upper() == book_format.upper():
converted_file_name = entry.name + '.' + book_format.lower()
@@ -369,7 +384,7 @@ def update_dir_structure_gdrive(book_id, first_author):
path = book.path
gd.updateDatabaseOnEdit(gFile['id'], book.path) # only child folder affected
else:
error = _(u'File %(file)s not found on Google Drive', file=book.path) # file not found
error = _(u'File %(file)s not found on Google Drive', file=book.path) # file not found
if authordir != new_authordir:
gFile = gd.getFileFromEbooksFolder(os.path.dirname(book.path), new_titledir)
@@ -379,7 +394,7 @@ def update_dir_structure_gdrive(book_id, first_author):
path = book.path
gd.updateDatabaseOnEdit(gFile['id'], book.path)
else:
error = _(u'File %(file)s not found on Google Drive', file=authordir) # file not found
error = _(u'File %(file)s not found on Google Drive', file=authordir) # file not found
# Rename all files from old names to new names
if authordir != new_authordir or titledir != new_titledir:
@@ -395,7 +410,7 @@ def update_dir_structure_gdrive(book_id, first_author):
def delete_book_gdrive(book, book_format):
error= False
error = False
if book_format:
name = ''
for entry in book.data:
@@ -403,12 +418,12 @@ def delete_book_gdrive(book, book_format):
name = entry.name + '.' + book_format
gFile = gd.getFileFromEbooksFolder(book.path, name)
else:
gFile = gd.getFileFromEbooksFolder(os.path.dirname(book.path),book.path.split('/')[1])
gFile = gd.getFileFromEbooksFolder(os.path.dirname(book.path), book.path.split('/')[1])
if gFile:
gd.deleteDatabaseEntry(gFile['id'])
gFile.Trash()
else:
error =_(u'Book path %(path)s not found on Google Drive', path=book.path) # file not found
error = _(u'Book path %(path)s not found on Google Drive', path=book.path) # file not found
return error
@@ -417,24 +432,25 @@ def reset_password(user_id):
password = generate_random_password()
existing_user.password = generate_password_hash(password)
if not config.get_mail_server_configured():
return (2, None)
return 2, None
try:
ub.session.commit()
send_registration_mail(existing_user.email, existing_user.nickname, password, True)
return (1, existing_user.nickname)
return 1, existing_user.nickname
except Exception:
ub.session.rollback()
return (0, None)
return 0, None
def generate_random_password():
s = "abcdefghijklmnopqrstuvwxyz01234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ!@#$%&*()?"
passlen = 8
return "".join(random.sample(s,passlen ))
return "".join(random.sample(s, passlen))
################################## External interface
def update_dir_stucture(book_id, calibrepath, first_author = None):
def update_dir_stucture(book_id, calibrepath, first_author=None):
if config.config_use_google_drive:
return update_dir_structure_gdrive(book_id, first_author)
else:
@@ -454,23 +470,26 @@ def get_cover_on_failure(use_generic_cover):
else:
return None
def get_book_cover(book_id):
book = db.session.query(db.Books).filter(db.Books.id == book_id).filter(common_filters()).first()
return get_book_cover_internal(book, use_generic_cover_on_failure=True)
def get_book_cover_with_uuid(book_uuid,
use_generic_cover_on_failure=True):
use_generic_cover_on_failure=True):
book = db.session.query(db.Books).filter(db.Books.uuid == book_uuid).first()
return get_book_cover_internal(book, use_generic_cover_on_failure)
def get_book_cover_internal(book,
use_generic_cover_on_failure):
use_generic_cover_on_failure):
if book and book.has_cover:
if config.config_use_google_drive:
try:
if not gd.is_gdrive_ready():
return get_cover_on_failure(use_generic_cover_on_failure)
path=gd.get_cover_via_gdrive(book.path)
path = gd.get_cover_via_gdrive(book.path)
if path:
return redirect(path)
else:
@@ -530,7 +549,7 @@ def save_cover(img, book_path):
return False, _("Only jpg/jpeg/png/webp files are supported as coverfile")
# convert to jpg because calibre only supports jpg
if content_type in ('image/png', 'image/webp'):
if hasattr(img,'stream'):
if hasattr(img, 'stream'):
imgc = PILImage.open(img.stream)
else:
imgc = PILImage.open(io.BytesIO(img.content))
@@ -539,7 +558,7 @@ def save_cover(img, book_path):
im.save(tmp_bytesio, format='JPEG')
img._content = tmp_bytesio.getvalue()
else:
if content_type not in ('image/jpeg'):
if content_type not in 'image/jpeg':
log.error("Only jpg/jpeg files are supported as coverfile")
return False, _("Only jpg/jpeg files are supported as coverfile")
@@ -557,7 +576,6 @@ def save_cover(img, book_path):
return save_cover_from_filestorage(os.path.join(config.config_calibre_dir, book_path), "cover.jpg", img)
def do_download_file(book, book_format, data, headers):
if config.config_use_google_drive:
startTime = time.time()
@@ -579,7 +597,6 @@ def do_download_file(book, book_format, data, headers):
##################################
def check_unrar(unrarLocation):
if not unrarLocation:
return
@@ -601,13 +618,12 @@ def check_unrar(unrarLocation):
return 'Error excecuting UnRar'
def json_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, (datetime)):
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, (timedelta)):
if isinstance(obj, timedelta):
return {
'__type__': 'timedelta',
'days': obj.days,
@@ -615,7 +631,7 @@ def json_serial(obj):
'microseconds': obj.microseconds,
}
# return obj.isoformat()
raise TypeError ("Type %s not serializable" % type(obj))
raise TypeError("Type %s not serializable" % type(obj))
# helper function for displaying the runtime of tasks
@@ -637,7 +653,7 @@ def format_runtime(runtime):
# helper function to apply localize status information in tasklist entries
def render_task_status(tasklist):
renderedtasklist=list()
renderedtasklist = list()
for task in tasklist:
if task['user'] == current_user.nickname or current_user.role_admin():
if task['formStarttime']:
@@ -653,7 +669,7 @@ def render_task_status(tasklist):
task['runtime'] = format_runtime(task['formRuntime'])
# localize the task status
if isinstance( task['stat'], int ):
if isinstance( task['stat'], int):
if task['stat'] == STAT_WAITING:
task['status'] = _(u'Waiting')
elif task['stat'] == STAT_FAIL:
@@ -666,14 +682,14 @@ def render_task_status(tasklist):
task['status'] = _(u'Unknown Status')
# localize the task type
if isinstance( task['taskType'], int ):
if isinstance( task['taskType'], int):
if task['taskType'] == TASK_EMAIL:
task['taskMessage'] = _(u'E-mail: ') + task['taskMess']
elif task['taskType'] == TASK_CONVERT:
elif task['taskType'] == TASK_CONVERT:
task['taskMessage'] = _(u'Convert: ') + task['taskMess']
elif task['taskType'] == TASK_UPLOAD:
elif task['taskType'] == TASK_UPLOAD:
task['taskMessage'] = _(u'Upload: ') + task['taskMess']
elif task['taskType'] == TASK_CONVERT_ANY:
elif task['taskType'] == TASK_CONVERT_ANY:
task['taskMessage'] = _(u'Convert: ') + task['taskMess']
else:
task['taskMessage'] = _(u'Unknown Task: ') + task['taskMess']
@@ -709,11 +725,11 @@ def common_filters(allow_show_archived=False):
pos_cc_list = current_user.allowed_column_value.split(',')
pos_content_cc_filter = true() if pos_cc_list == [''] else \
getattr(db.Books, 'custom_column_' + str(config.config_restricted_column)).\
any(db.cc_classes[config.config_restricted_column].value.in_(pos_cc_list))
any(db.cc_classes[config.config_restricted_column].value.in_(pos_cc_list))
neg_cc_list = current_user.denied_column_value.split(',')
neg_content_cc_filter = false() if neg_cc_list == [''] else \
getattr(db.Books, 'custom_column_' + str(config.config_restricted_column)).\
any(db.cc_classes[config.config_restricted_column].value.in_(neg_cc_list))
any(db.cc_classes[config.config_restricted_column].value.in_(neg_cc_list))
else:
pos_content_cc_filter = true()
neg_content_cc_filter = false()
@@ -733,8 +749,9 @@ def tags_filters():
# Creates for all stored languages a translated speaking name in the array for the UI
def speaking_language(languages=None):
if not languages:
languages = db.session.query(db.Languages).join(db.books_languages_link).join(db.Books).filter(common_filters())\
.group_by(text('books_languages_link.lang_code')).all()
languages = db.session.query(db.Languages).join(db.books_languages_link).join(db.Books)\
.filter(common_filters())\
.group_by(text('books_languages_link.lang_code')).all()
for lang in languages:
try:
cur_l = LC.parse(lang.lang_code)
@@ -743,6 +760,7 @@ def speaking_language(languages=None):
lang.name = _(isoLanguages.get(part3=lang.lang_code).name)
return languages
# checks if domain is in database (including wildcards)
# example SELECT * FROM @TABLE WHERE 'abcdefg' LIKE Name;
# from https://code.luasoftware.com/tutorials/flask/execute-raw-sql-in-flask-sqlalchemy/
@@ -787,21 +805,25 @@ def fill_indexpage_with_archived_books(page, database, db_filter, order, allow_s
randm = false()
off = int(int(config.config_books_per_page) * (page - 1))
pagination = Pagination(page, config.config_books_per_page,
len(db.session.query(database).filter(db_filter).filter(common_filters(allow_show_archived)).all()))
entries = db.session.query(database).join(*join, isouter=True).filter(db_filter).filter(common_filters(allow_show_archived)).\
order_by(*order).offset(off).limit(config.config_books_per_page).all()
len(db.session.query(database).filter(db_filter)
.filter(common_filters(allow_show_archived)).all()))
entries = db.session.query(database).join(*join, isouter=True).filter(db_filter)\
.filter(common_filters(allow_show_archived))\
.order_by(*order).offset(off).limit(config.config_books_per_page).all()
for book in entries:
book = order_authors(book)
return entries, randm, pagination
def get_typeahead(database, query, replace=('',''), tag_filter=true()):
def get_typeahead(database, query, replace=('', ''), tag_filter=true()):
query = query or ''
db.session.connection().connection.connection.create_function("lower", 1, lcase)
entries = db.session.query(database).filter(tag_filter).filter(func.lower(database.name).ilike("%" + query + "%")).all()
entries = db.session.query(database).filter(tag_filter).\
filter(func.lower(database.name).ilike("%" + query + "%")).all()
json_dumps = json.dumps([dict(name=r.name.replace(*replace)) for r in entries])
return json_dumps
# read search results from calibre-database and return it (function is used for feed and simple search
def get_search_results(term):
db.session.connection().connection.connection.create_function("lower", 1, lcase)
@@ -820,6 +842,7 @@ def get_search_results(term):
func.lower(db.Books.title).ilike("%" + term + "%")
)).all()
def get_cc_columns():
tmpcc = db.session.query(db.Custom_Columns).filter(db.Custom_Columns.datatype.notin_(db.cc_exceptions)).all()
if config.config_columns_to_ignore:
@@ -832,6 +855,7 @@ def get_cc_columns():
cc = tmpcc
return cc
def get_download_link(book_id, book_format):
book_format = book_format.split(".")[0]
book = db.session.query(db.Books).filter(db.Books.id == book_id).filter(common_filters()).first()
@@ -856,7 +880,8 @@ def get_download_link(book_id, book_format):
else:
abort(404)
def check_exists_book(authr,title):
def check_exists_book(authr, title):
db.session.connection().connection.connection.create_function("lower", 1, lcase)
q = list()
authorterms = re.split(r'\s*&\s*', authr)
@@ -865,11 +890,12 @@ def check_exists_book(authr,title):
return db.session.query(db.Books).filter(
and_(db.Books.authors.any(and_(*q)),
func.lower(db.Books.title).ilike("%" + title + "%")
)).first()
func.lower(db.Books.title).ilike("%" + title + "%")
)).first()
############### Database Helper functions
def lcase(s):
try:
return unidecode.unidecode(s.lower())