mirror of
				https://github.com/janeczku/calibre-web
				synced 2025-10-31 15:23:02 +00:00 
			
		
		
		
	Upload (multiple) book formats with progress and merge the corresponding metadata into the book
This commit is contained in:
		
							
								
								
									
										533
									
								
								cps/editbooks.py
									
									
									
									
									
								
							
							
						
						
									
										533
									
								
								cps/editbooks.py
									
									
									
									
									
								
							| @@ -24,7 +24,7 @@ import os | ||||
| from datetime import datetime, timezone | ||||
| import json | ||||
| from shutil import copyfile | ||||
| from uuid import uuid4 | ||||
|  | ||||
| from markupsafe import escape, Markup  # dependency of flask | ||||
| from functools import wraps | ||||
|  | ||||
| @@ -97,135 +97,7 @@ def show_edit_book(book_id): | ||||
| @login_required_if_no_ano | ||||
| @edit_required | ||||
| def edit_book(book_id): | ||||
|     modify_date = False | ||||
|     edit_error = False | ||||
|  | ||||
|     # create the function for sorting... | ||||
|     calibre_db.update_title_sort(config) | ||||
|  | ||||
|     book = calibre_db.get_filtered_book(book_id, allow_show_archived=True) | ||||
|     # Book not found | ||||
|     if not book: | ||||
|         flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"), | ||||
|               category="error") | ||||
|         return redirect(url_for("web.index")) | ||||
|  | ||||
|     to_save = request.form.to_dict() | ||||
|  | ||||
|     try: | ||||
|         # Update folder of book on local disk | ||||
|         edited_books_id = None | ||||
|         title_author_error = None | ||||
|         # handle book title change | ||||
|         title_change = handle_title_on_edit(book, to_save["book_title"]) | ||||
|         # handle book author change | ||||
|         input_authors, author_change = handle_author_on_edit(book, to_save["author_name"]) | ||||
|         if author_change or title_change: | ||||
|             edited_books_id = book.id | ||||
|             modify_date = True | ||||
|             title_author_error = helper.update_dir_structure(edited_books_id, | ||||
|                                                              config.get_book_path(), | ||||
|                                                              input_authors[0]) | ||||
|         if title_author_error: | ||||
|             flash(title_author_error, category="error") | ||||
|             calibre_db.session.rollback() | ||||
|             book = calibre_db.get_filtered_book(book_id, allow_show_archived=True) | ||||
|  | ||||
|         # handle upload covers from local disk | ||||
|         cover_upload_success = upload_cover(request, book) | ||||
|         if cover_upload_success: | ||||
|             book.has_cover = 1 | ||||
|             modify_date = True | ||||
|         meta ={} | ||||
|         # upload new covers or new file formats to google drive | ||||
|         if config.config_use_google_drive: | ||||
|             gdriveutils.updateGdriveCalibreFromLocal() | ||||
|  | ||||
|         if to_save.get("cover_url", None): | ||||
|             if not current_user.role_upload(): | ||||
|                 edit_error = True | ||||
|                 flash(_("User has no rights to upload cover"), category="error") | ||||
|             if to_save["cover_url"].endswith('/static/generic_cover.jpg'): | ||||
|                 book.has_cover = 0 | ||||
|             else: | ||||
|                 result, error = helper.save_cover_from_url(to_save["cover_url"].strip(), book.path) | ||||
|                 if result is True: | ||||
|                     book.has_cover = 1 | ||||
|                     modify_date = True | ||||
|                     helper.replace_cover_thumbnail_cache(book.id) | ||||
|                 else: | ||||
|                     flash(error, category="error") | ||||
|  | ||||
|         # Add default series_index to book | ||||
|         modify_date |= edit_book_series_index(to_save["series_index"], book) | ||||
|         # Handle book comments/description | ||||
|         modify_date |= edit_book_comments(Markup(to_save['description']).unescape(), book) | ||||
|         # Handle identifiers | ||||
|         input_identifiers = identifier_list(to_save, book) | ||||
|         modification, warning = modify_identifiers(input_identifiers, book.identifiers, calibre_db.session) | ||||
|         if warning: | ||||
|             flash(_("Identifiers are not Case Sensitive, Overwriting Old Identifier"), category="warning") | ||||
|         modify_date |= modification | ||||
|         # Handle book tags | ||||
|         modify_date |= edit_book_tags(to_save['tags'], book) | ||||
|         # Handle book series | ||||
|         modify_date |= edit_book_series(to_save["series"], book) | ||||
|         # handle book publisher | ||||
|         modify_date |= edit_book_publisher(to_save['publisher'], book) | ||||
|         # handle book languages | ||||
|         try: | ||||
|             modify_date |= edit_book_languages(to_save['languages'], book) | ||||
|         except ValueError as e: | ||||
|             flash(str(e), category="error") | ||||
|             edit_error = True | ||||
|         # handle book ratings | ||||
|         modify_date |= edit_book_ratings(to_save, book) | ||||
|         # handle cc data | ||||
|         modify_date |= edit_all_cc_data(book_id, book, to_save) | ||||
|  | ||||
|         if to_save.get("pubdate", None): | ||||
|             try: | ||||
|                 book.pubdate = datetime.strptime(to_save["pubdate"], "%Y-%m-%d") | ||||
|             except ValueError as e: | ||||
|                 book.pubdate = db.Books.DEFAULT_PUBDATE | ||||
|                 flash(str(e), category="error") | ||||
|                 edit_error = True | ||||
|         else: | ||||
|             book.pubdate = db.Books.DEFAULT_PUBDATE | ||||
|  | ||||
|         if modify_date: | ||||
|             book.last_modified = datetime.now(timezone.utc) | ||||
|             kobo_sync_status.remove_synced_book(edited_books_id, all=True) | ||||
|             calibre_db.set_metadata_dirty(book.id) | ||||
|  | ||||
|         calibre_db.session.merge(book) | ||||
|         calibre_db.session.commit() | ||||
|         if config.config_use_google_drive: | ||||
|             gdriveutils.updateGdriveCalibreFromLocal() | ||||
|         if meta is not False \ | ||||
|             and edit_error is not True \ | ||||
|                 and title_author_error is not True \ | ||||
|                 and cover_upload_success is not False: | ||||
|             flash(_("Metadata successfully updated"), category="success") | ||||
|         if "detail_view" in to_save: | ||||
|             return redirect(url_for('web.show_book', book_id=book.id)) | ||||
|         else: | ||||
|             return render_edit_book(book_id) | ||||
|     except ValueError as e: | ||||
|         log.error_or_exception("Error: {}".format(e)) | ||||
|         calibre_db.session.rollback() | ||||
|         flash(str(e), category="error") | ||||
|         return redirect(url_for('web.show_book', book_id=book.id)) | ||||
|     except (OperationalError, IntegrityError, StaleDataError, InterfaceError) as e: | ||||
|         log.error_or_exception("Database error: {}".format(e)) | ||||
|         calibre_db.session.rollback() | ||||
|         flash(_("Oops! Database Error: %(error)s.", error=e.orig if hasattr(e, "orig") else e), category="error") | ||||
|         return redirect(url_for('web.show_book', book_id=book.id)) | ||||
|     except Exception as ex: | ||||
|         log.error_or_exception(ex) | ||||
|         calibre_db.session.rollback() | ||||
|         flash(_("Error editing book: {}".format(ex)), category="error") | ||||
|         return redirect(url_for('web.show_book', book_id=book.id)) | ||||
|     return do_edit_book(book_id) | ||||
|  | ||||
|  | ||||
| @editbook.route("/upload", methods=["POST"]) | ||||
| @@ -233,41 +105,14 @@ def edit_book(book_id): | ||||
| @upload_required | ||||
| def upload(): | ||||
|     if len(request.files.getlist("btn-upload-format")): | ||||
|         # create the function for sorting... | ||||
|         calibre_db.update_title_sort(config) | ||||
|         book_id = request.form.get('book_id', -1) | ||||
|  | ||||
|         book = calibre_db.get_filtered_book(book_id, allow_show_archived=True) | ||||
|         # Book not found | ||||
|         if not book: | ||||
|             flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"), | ||||
|                   category="error") | ||||
|             return redirect(url_for("web.index")) | ||||
|  | ||||
|         # handle upload other formats from local disk | ||||
|         for requested_file in request.files.getlist("btn-upload-format"): | ||||
|             meta = upload_single_file(requested_file, book, book_id) | ||||
|             # save data to database, reread data | ||||
|             calibre_db.session.commit() | ||||
|  | ||||
|         resp = {"location": url_for('edit-book.show_edit_book', book_id=book_id)} | ||||
|         return Response(json.dumps(resp), mimetype='application/json') | ||||
|  | ||||
|         # only merge metadata if file was uploaded and no error occurred (meta equals not false or none) | ||||
|  | ||||
|  | ||||
|         return do_edit_book(book_id, request.files.getlist("btn-upload-format")) | ||||
|     elif len(request.files.getlist("btn-upload")): | ||||
|         for requested_file in request.files.getlist("btn-upload"): | ||||
|             try: | ||||
|                 modify_date = False | ||||
|                 # create the function for sorting... | ||||
|                 calibre_db.update_title_sort(config) | ||||
|                 try: | ||||
|                     # sqlalchemy 2.0 | ||||
|                     uuid_func = calibre_db.session.connection().connection.driver_connection | ||||
|                 except AttributeError: | ||||
|                     uuid_func = calibre_db.session.connection().connection.connection | ||||
|                 uuid_func.create_function('uuid4', 0,lambda: str(uuid4())) | ||||
|                 calibre_db.create_functions(config) | ||||
|                 meta, error = file_handling_on_upload(requested_file) | ||||
|                 if error: | ||||
|                     return error | ||||
| @@ -592,22 +437,163 @@ def table_xchange_author_title(): | ||||
|     return "" | ||||
|  | ||||
|  | ||||
| def do_edit_book(book_id, upload_formats=None): | ||||
|     modify_date = False | ||||
|     edit_error = False | ||||
|  | ||||
|     # create the function for sorting... | ||||
|     calibre_db.create_functions(config) | ||||
|  | ||||
|     book = calibre_db.get_filtered_book(book_id, allow_show_archived=True) | ||||
|     # Book not found | ||||
|     if not book: | ||||
|         flash(_("Oops! Selected book is unavailable. File does not exist or is not accessible"), | ||||
|               category="error") | ||||
|         return redirect(url_for("web.index")) | ||||
|  | ||||
|     to_save = request.form.to_dict() | ||||
|  | ||||
|     try: | ||||
|         # Update folder of book on local disk | ||||
|         edited_books_id = None | ||||
|         title_author_error = None | ||||
|         upload_format = False | ||||
|         # handle book title change | ||||
|         if "book_title" in to_save: | ||||
|             title_change = handle_title_on_edit(book, to_save["book_title"]) | ||||
|         # handle book author change | ||||
|         if not upload_formats: | ||||
|             input_authors, author_change = handle_author_on_edit(book, to_save["author_name"]) | ||||
|             if author_change or title_change: | ||||
|                 edited_books_id = book.id | ||||
|                 modify_date = True | ||||
|                 title_author_error = helper.update_dir_structure(edited_books_id, | ||||
|                                                                  config.get_book_path(), | ||||
|                                                                  input_authors[0]) | ||||
|             if title_author_error: | ||||
|                 flash(title_author_error, category="error") | ||||
|                 calibre_db.session.rollback() | ||||
|                 book = calibre_db.get_filtered_book(book_id, allow_show_archived=True) | ||||
|  | ||||
|             # handle book ratings | ||||
|             modify_date |= edit_book_ratings(to_save, book) | ||||
|             meta = True | ||||
|         else: | ||||
|             # handle upload other formats from local disk | ||||
|             meta = upload_single_file(upload_formats, book, book_id) | ||||
|             # only merge metadata if file was uploaded and no error occurred (meta equals not false or none) | ||||
|             if meta: | ||||
|                 upload_format = merge_metadata(to_save, meta) | ||||
|         # handle upload covers from local disk | ||||
|         cover_upload_success = upload_cover(request, book) | ||||
|         if cover_upload_success: | ||||
|             book.has_cover = 1 | ||||
|             modify_date = True | ||||
|  | ||||
|         # upload new covers or new file formats to google drive | ||||
|         if config.config_use_google_drive: | ||||
|             gdriveutils.updateGdriveCalibreFromLocal() | ||||
|  | ||||
|         if to_save.get("cover_url", None): | ||||
|             if not current_user.role_upload(): | ||||
|                 edit_error = True | ||||
|                 flash(_("User has no rights to upload cover"), category="error") | ||||
|             if to_save["cover_url"].endswith('/static/generic_cover.jpg'): | ||||
|                 book.has_cover = 0 | ||||
|             else: | ||||
|                 result, error = helper.save_cover_from_url(to_save["cover_url"].strip(), book.path) | ||||
|                 if result is True: | ||||
|                     book.has_cover = 1 | ||||
|                     modify_date = True | ||||
|                     helper.replace_cover_thumbnail_cache(book.id) | ||||
|                 else: | ||||
|                     flash(error, category="error") | ||||
|  | ||||
|         # Add default series_index to book | ||||
|         modify_date |= edit_book_series_index(to_save.get("series_index"), book) | ||||
|         # Handle book comments/description | ||||
|         modify_date |= edit_book_comments(Markup(to_save.get('description')).unescape(), book) | ||||
|         # Handle identifiers | ||||
|         input_identifiers = identifier_list(to_save, book) | ||||
|         modification, warning = modify_identifiers(input_identifiers, book.identifiers, calibre_db.session) | ||||
|         if warning: | ||||
|             flash(_("Identifiers are not Case Sensitive, Overwriting Old Identifier"), category="warning") | ||||
|         modify_date |= modification | ||||
|         # Handle book tags | ||||
|         modify_date |= edit_book_tags(to_save.get('tags'), book) | ||||
|         # Handle book series | ||||
|         modify_date |= edit_book_series(to_save.get("series"), book) | ||||
|         # handle book publisher | ||||
|         modify_date |= edit_book_publisher(to_save.get('publisher'), book) | ||||
|         # handle book languages | ||||
|         try: | ||||
|             modify_date |= edit_book_languages(to_save.get('languages'), book, upload_format) | ||||
|         except ValueError as e: | ||||
|             flash(str(e), category="error") | ||||
|             edit_error = True | ||||
|         # handle cc data | ||||
|         modify_date |= edit_all_cc_data(book_id, book, to_save) | ||||
|  | ||||
|         if to_save.get("pubdate") is not None: | ||||
|             if to_save.get("pubdate"): | ||||
|                 try: | ||||
|                     book.pubdate = datetime.strptime(to_save["pubdate"], "%Y-%m-%d") | ||||
|                 except ValueError as e: | ||||
|                     book.pubdate = db.Books.DEFAULT_PUBDATE | ||||
|                     flash(str(e), category="error") | ||||
|                     edit_error = True | ||||
|             else: | ||||
|                 book.pubdate = db.Books.DEFAULT_PUBDATE | ||||
|  | ||||
|         if modify_date: | ||||
|             book.last_modified = datetime.now(timezone.utc) | ||||
|             kobo_sync_status.remove_synced_book(edited_books_id, all=True) | ||||
|             calibre_db.set_metadata_dirty(book.id) | ||||
|  | ||||
|         calibre_db.session.merge(book) | ||||
|         calibre_db.session.commit() | ||||
|         if config.config_use_google_drive: | ||||
|             gdriveutils.updateGdriveCalibreFromLocal() | ||||
|         if upload_formats: | ||||
|             resp = {"location": url_for('edit-book.show_edit_book', book_id=book_id)} | ||||
|             return Response(json.dumps(resp), mimetype='application/json') | ||||
|  | ||||
|         # if meta is not False \ | ||||
|         if edit_error is not True and title_author_error is not True and cover_upload_success is not False: | ||||
|             flash(_("Metadata successfully updated"), category="success") | ||||
|         if "detail_view" in to_save: | ||||
|             return redirect(url_for('web.show_book', book_id=book.id)) | ||||
|         else: | ||||
|             return render_edit_book(book_id) | ||||
|     except ValueError as e: | ||||
|         log.error_or_exception("Error: {}".format(e)) | ||||
|         calibre_db.session.rollback() | ||||
|         flash(str(e), category="error") | ||||
|         return redirect(url_for('web.show_book', book_id=book.id)) | ||||
|     except (OperationalError, IntegrityError, StaleDataError, InterfaceError) as e: | ||||
|         log.error_or_exception("Database error: {}".format(e)) | ||||
|         calibre_db.session.rollback() | ||||
|         flash(_("Oops! Database Error: %(error)s.", error=e.orig if hasattr(e, "orig") else e), category="error") | ||||
|         return redirect(url_for('web.show_book', book_id=book.id)) | ||||
|     except Exception as ex: | ||||
|         log.error_or_exception(ex) | ||||
|         calibre_db.session.rollback() | ||||
|         flash(_("Error editing book: {}".format(ex)), category="error") | ||||
|         return redirect(url_for('web.show_book', book_id=book.id)) | ||||
|  | ||||
|  | ||||
| def merge_metadata(to_save, meta): | ||||
|     if to_save.get('author_name', "") == _('Unknown'): | ||||
|         to_save['author_name'] = '' | ||||
|     if to_save.get('book_title', "") == _('Unknown'): | ||||
|         to_save['book_title'] = '' | ||||
|     if not to_save["languages"] and meta.languages: | ||||
|     if not to_save.get("languages") and meta.languages: | ||||
|         upload_language = True | ||||
|     else: | ||||
|         upload_language = False | ||||
|     for s_field, m_field in [ | ||||
|             ('tags', 'tags'), ('author_name', 'author'), ('series', 'series'), | ||||
|             ('series_index', 'series_id'), ('languages', 'languages'), | ||||
|             ('book_title', 'title')]: | ||||
|         to_save[s_field] = to_save[s_field] or getattr(meta, m_field, '') | ||||
|     to_save["description"] = to_save["description"] or Markup( | ||||
|         getattr(meta, 'description', '')).unescape() | ||||
|             ('book_title', 'title'), ('description', 'description'),]: | ||||
|         val = getattr(meta, m_field, '') | ||||
|         if val: | ||||
|             to_save[s_field] = val | ||||
|     return upload_language | ||||
|  | ||||
| def identifier_list(to_save, book): | ||||
| @@ -1019,84 +1005,93 @@ def edit_book_ratings(to_save, book): | ||||
|  | ||||
|  | ||||
| def edit_book_tags(tags, book): | ||||
|     input_tags = tags.split(',') | ||||
|     input_tags = list(map(lambda it: strip_whitespaces(it), input_tags)) | ||||
|     # Remove duplicates | ||||
|     input_tags = helper.uniq(input_tags) | ||||
|     return modify_database_object(input_tags, book.tags, db.Tags, calibre_db.session, 'tags') | ||||
|  | ||||
|     if tags: | ||||
|         input_tags = tags.split(',') | ||||
|         input_tags = list(map(lambda it: strip_whitespaces(it), input_tags)) | ||||
|         # Remove duplicates | ||||
|         input_tags = helper.uniq(input_tags) | ||||
|         return modify_database_object(input_tags, book.tags, db.Tags, calibre_db.session, 'tags') | ||||
|     return False | ||||
|  | ||||
| def edit_book_series(series, book): | ||||
|     input_series = [strip_whitespaces(series)] | ||||
|     input_series = [x for x in input_series if x != ''] | ||||
|     return modify_database_object(input_series, book.series, db.Series, calibre_db.session, 'series') | ||||
|     if series: | ||||
|         input_series = [strip_whitespaces(series)] | ||||
|         input_series = [x for x in input_series if x != ''] | ||||
|         return modify_database_object(input_series, book.series, db.Series, calibre_db.session, 'series') | ||||
|     return False | ||||
|  | ||||
|  | ||||
| def edit_book_series_index(series_index, book): | ||||
|     # Add default series_index to book | ||||
|     modify_date = False | ||||
|     series_index = series_index or '1' | ||||
|     if not series_index.replace('.', '', 1).isdigit(): | ||||
|         flash(_("Seriesindex: %(seriesindex)s is not a valid number, skipping", seriesindex=series_index), category="warning") | ||||
|         return False | ||||
|     if str(book.series_index) != series_index: | ||||
|         book.series_index = series_index | ||||
|         modify_date = True | ||||
|     return modify_date | ||||
|     if series_index: | ||||
|         # Add default series_index to book | ||||
|         modify_date = False | ||||
|         series_index = series_index or '1' | ||||
|         if not series_index.replace('.', '', 1).isdigit(): | ||||
|             flash(_("Seriesindex: %(seriesindex)s is not a valid number, skipping", seriesindex=series_index), category="warning") | ||||
|             return False | ||||
|         if str(book.series_index) != series_index: | ||||
|             book.series_index = series_index | ||||
|             modify_date = True | ||||
|         return modify_date | ||||
|     return False | ||||
|  | ||||
|  | ||||
| # Handle book comments/description | ||||
| def edit_book_comments(comments, book): | ||||
|     modify_date = False | ||||
|     if comments: | ||||
|         comments = clean_string(comments, book.id) | ||||
|     if len(book.comments): | ||||
|         if book.comments[0].text != comments: | ||||
|             book.comments[0].text = comments | ||||
|             modify_date = True | ||||
|     else: | ||||
|     if comments is not None: | ||||
|         modify_date = False | ||||
|         if comments: | ||||
|             book.comments.append(db.Comments(comment=comments, book=book.id)) | ||||
|             modify_date = True | ||||
|     return modify_date | ||||
|             comments = clean_string(comments, book.id) | ||||
|         if len(book.comments): | ||||
|             if book.comments[0].text != comments: | ||||
|                 book.comments[0].text = comments | ||||
|                 modify_date = True | ||||
|         else: | ||||
|             if comments: | ||||
|                 book.comments.append(db.Comments(comment=comments, book=book.id)) | ||||
|                 modify_date = True | ||||
|         return modify_date | ||||
|  | ||||
|  | ||||
| def edit_book_languages(languages, book, upload_mode=False, invalid=None): | ||||
|     input_languages = languages.split(',') | ||||
|     unknown_languages = [] | ||||
|     if not upload_mode: | ||||
|         input_l = isoLanguages.get_language_codes(get_locale(), input_languages, unknown_languages) | ||||
|     else: | ||||
|         input_l = isoLanguages.get_valid_language_codes(get_locale(), input_languages, unknown_languages) | ||||
|     for lang in unknown_languages: | ||||
|         log.error("'%s' is not a valid language", lang) | ||||
|         if isinstance(invalid, list): | ||||
|             invalid.append(lang) | ||||
|     if languages: | ||||
|         input_languages = languages.split(',') | ||||
|         unknown_languages = [] | ||||
|         if not upload_mode: | ||||
|             input_l = isoLanguages.get_language_codes(get_locale(), input_languages, unknown_languages) | ||||
|         else: | ||||
|             raise ValueError(_("'%(langname)s' is not a valid language", langname=lang)) | ||||
|     # ToDo: Not working correct | ||||
|     if upload_mode and len(input_l) == 1: | ||||
|         # If the language of the file is excluded from the users view, it's not imported, to allow the user to view | ||||
|         # the book it's language is set to the filter language | ||||
|         if input_l[0] != current_user.filter_language() and current_user.filter_language() != "all": | ||||
|             input_l[0] = calibre_db.session.query(db.Languages). \ | ||||
|                 filter(db.Languages.lang_code == current_user.filter_language()).first().lang_code | ||||
|     # Remove duplicates | ||||
|     input_l = helper.uniq(input_l) | ||||
|     return modify_database_object(input_l, book.languages, db.Languages, calibre_db.session, 'languages') | ||||
|             input_l = isoLanguages.get_valid_language_codes(get_locale(), input_languages, unknown_languages) | ||||
|         for lang in unknown_languages: | ||||
|             log.error("'%s' is not a valid language", lang) | ||||
|             if isinstance(invalid, list): | ||||
|                 invalid.append(lang) | ||||
|             else: | ||||
|                 raise ValueError(_("'%(langname)s' is not a valid language", langname=lang)) | ||||
|         # ToDo: Not working correct | ||||
|         if upload_mode and len(input_l) == 1: | ||||
|             # If the language of the file is excluded from the users view, it's not imported, to allow the user to view | ||||
|             # the book it's language is set to the filter language | ||||
|             if input_l[0] != current_user.filter_language() and current_user.filter_language() != "all": | ||||
|                 input_l[0] = calibre_db.session.query(db.Languages). \ | ||||
|                     filter(db.Languages.lang_code == current_user.filter_language()).first().lang_code | ||||
|         # Remove duplicates | ||||
|         input_l = helper.uniq(input_l) | ||||
|         return modify_database_object(input_l, book.languages, db.Languages, calibre_db.session, 'languages') | ||||
|     return False | ||||
|  | ||||
|  | ||||
| def edit_book_publisher(publishers, book): | ||||
|     changed = False | ||||
|     if publishers: | ||||
|         publisher = strip_whitespaces(publishers) | ||||
|         if len(book.publishers) == 0 or (len(book.publishers) > 0 and publisher != book.publishers[0].name): | ||||
|             changed |= modify_database_object([publisher], book.publishers, db.Publishers, calibre_db.session, | ||||
|                                               'publisher') | ||||
|     elif len(book.publishers): | ||||
|         changed |= modify_database_object([], book.publishers, db.Publishers, calibre_db.session, 'publisher') | ||||
|     return changed | ||||
|  | ||||
|         changed = False | ||||
|         if publishers: | ||||
|             publisher = strip_whitespaces(publishers) | ||||
|             if len(book.publishers) == 0 or (len(book.publishers) > 0 and publisher != book.publishers[0].name): | ||||
|                 changed |= modify_database_object([publisher], book.publishers, db.Publishers, calibre_db.session, | ||||
|                                                   'publisher') | ||||
|         elif len(book.publishers): | ||||
|             changed |= modify_database_object([], book.publishers, db.Publishers, calibre_db.session, 'publisher') | ||||
|         return changed | ||||
|     return False | ||||
|  | ||||
| def edit_cc_data_value(book_id, book, c, to_save, cc_db_value, cc_string): | ||||
|     changed = False | ||||
| @@ -1177,61 +1172,68 @@ def edit_cc_data(book_id, book, to_save, cc): | ||||
|     changed = False | ||||
|     for c in cc: | ||||
|         cc_string = "custom_column_" + str(c.id) | ||||
|         if not c.is_multiple: | ||||
|             if len(getattr(book, cc_string)) > 0: | ||||
|                 cc_db_value = getattr(book, cc_string)[0].value | ||||
|             else: | ||||
|                 cc_db_value = None | ||||
|             if strip_whitespaces(to_save[cc_string]): | ||||
|                 if c.datatype in ['int', 'bool', 'float', "datetime", "comments"]: | ||||
|                     change, to_save = edit_cc_data_value(book_id, book, c, to_save, cc_db_value, cc_string) | ||||
|         if to_save.get(cc_string): | ||||
|             if not c.is_multiple: | ||||
|                 if len(getattr(book, cc_string)) > 0: | ||||
|                     cc_db_value = getattr(book, cc_string)[0].value | ||||
|                 else: | ||||
|                     change, to_save = edit_cc_data_string(book, c, to_save, cc_db_value, cc_string) | ||||
|                 changed |= change | ||||
|                     cc_db_value = None | ||||
|                 if strip_whitespaces(to_save[cc_string]): | ||||
|                     if c.datatype in ['int', 'bool', 'float', "datetime", "comments"]: | ||||
|                         change, to_save = edit_cc_data_value(book_id, book, c, to_save, cc_db_value, cc_string) | ||||
|                     else: | ||||
|                         change, to_save = edit_cc_data_string(book, c, to_save, cc_db_value, cc_string) | ||||
|                     changed |= change | ||||
|                 else: | ||||
|                     if cc_db_value is not None: | ||||
|                         # remove old cc_val | ||||
|                         del_cc = getattr(book, cc_string)[0] | ||||
|                         getattr(book, cc_string).remove(del_cc) | ||||
|                         if not del_cc.books or len(del_cc.books) == 0: | ||||
|                             calibre_db.session.delete(del_cc) | ||||
|                             changed = True | ||||
|             else: | ||||
|                 if cc_db_value is not None: | ||||
|                     # remove old cc_val | ||||
|                     del_cc = getattr(book, cc_string)[0] | ||||
|                     getattr(book, cc_string).remove(del_cc) | ||||
|                     if not del_cc.books or len(del_cc.books) == 0: | ||||
|                         calibre_db.session.delete(del_cc) | ||||
|                         changed = True | ||||
|         else: | ||||
|             input_tags = to_save[cc_string].split(',') | ||||
|             input_tags = list(map(lambda it: strip_whitespaces(it), input_tags)) | ||||
|             changed |= modify_database_object(input_tags, | ||||
|                                               getattr(book, cc_string), | ||||
|                                               db.cc_classes[c.id], | ||||
|                                               calibre_db.session, | ||||
|                                               'custom') | ||||
|                 input_tags = to_save[cc_string].split(',') | ||||
|                 input_tags = list(map(lambda it: strip_whitespaces(it), input_tags)) | ||||
|                 changed |= modify_database_object(input_tags, | ||||
|                                                   getattr(book, cc_string), | ||||
|                                                   db.cc_classes[c.id], | ||||
|                                                   calibre_db.session, | ||||
|                                                   'custom') | ||||
|     return changed | ||||
|  | ||||
|  | ||||
| # returns None if no file is uploaded | ||||
| # returns False if an error occurs, in all other cases the ebook metadata is returned | ||||
| def upload_single_file(requested_file, book, book_id): | ||||
| def upload_single_file(requested_files, book, book_id): | ||||
|     # Check and handle Uploaded file | ||||
|     # requested_file = file_request.files.get('btn-upload-format', None) | ||||
|     # ToDo: Handle multiple files | ||||
|     meta = {} | ||||
|     allowed_extensions = config.config_upload_formats.split(',') | ||||
|     if requested_file: | ||||
|     for requested_file in requested_files: | ||||
|         if config.config_check_extensions and allowed_extensions != ['']: | ||||
|             if not validate_mime_type(requested_file, allowed_extensions): | ||||
|                 flash(_("File type isn't allowed to be uploaded to this server"), category="error") | ||||
|                 return False | ||||
|                 continue | ||||
|                 # return False | ||||
|         # check for empty request | ||||
|         if requested_file.filename != '': | ||||
|             if not current_user.role_upload(): | ||||
|                 flash(_("User has no rights to upload additional file formats"), category="error") | ||||
|                 return False | ||||
|                 continue | ||||
|                 # return False | ||||
|             if '.' in requested_file.filename: | ||||
|                 file_ext = requested_file.filename.rsplit('.', 1)[-1].lower() | ||||
|                 if file_ext not in allowed_extensions and '' not in allowed_extensions: | ||||
|                     flash(_("File extension '%(ext)s' is not allowed to be uploaded to this server", ext=file_ext), | ||||
|                           category="error") | ||||
|                     return False | ||||
|                     continue | ||||
|                     # return False | ||||
|             else: | ||||
|                 flash(_('File to be uploaded must have an extension'), category="error") | ||||
|                 return False | ||||
|                 continue | ||||
|                 # return False | ||||
|  | ||||
|             file_name = book.path.rsplit('/', 1)[-1] | ||||
|             filepath = os.path.normpath(os.path.join(config.get_book_path(), book.path)) | ||||
| @@ -1244,12 +1246,14 @@ def upload_single_file(requested_file, book, book_id): | ||||
|                 except OSError: | ||||
|                     flash(_("Failed to create path %(path)s (Permission denied).", path=filepath), | ||||
|                           category="error") | ||||
|                     return False | ||||
|                     continue | ||||
|                     # return False | ||||
|             try: | ||||
|                 requested_file.save(saved_filename) | ||||
|             except OSError: | ||||
|                 flash(_("Failed to store file %(file)s.", file=saved_filename), category="error") | ||||
|                 return False | ||||
|                 continue | ||||
|                 # return False | ||||
|  | ||||
|             file_size = os.path.getsize(saved_filename) | ||||
|             is_format = calibre_db.get_book_format(book_id, file_ext.upper()) | ||||
| @@ -1262,13 +1266,14 @@ def upload_single_file(requested_file, book, book_id): | ||||
|                     db_format = db.Data(book_id, file_ext.upper(), file_size, file_name) | ||||
|                     calibre_db.session.add(db_format) | ||||
|                     calibre_db.session.commit() | ||||
|                     calibre_db.update_title_sort(config) | ||||
|                     calibre_db.create_functions(config) | ||||
|                 except (OperationalError, IntegrityError, StaleDataError) as e: | ||||
|                     calibre_db.session.rollback() | ||||
|                     log.error_or_exception("Database error: {}".format(e)) | ||||
|                     flash(_("Oops! Database Error: %(error)s.", error=e.orig if hasattr(e, "orig") else e), | ||||
|                           category="error") | ||||
|                     return False  # return redirect(url_for('web.show_book', book_id=book.id)) | ||||
|                     continue | ||||
|                     # return False  # return redirect(url_for('web.show_book', book_id=book.id)) | ||||
|  | ||||
|             # Queue uploader info | ||||
|             link = '<a href="{}">{}</a>'.format(url_for('web.show_book', book_id=book.id), escape(book.title)) | ||||
| @@ -1278,7 +1283,7 @@ def upload_single_file(requested_file, book, book_id): | ||||
|             return uploader.process( | ||||
|                 saved_filename, *os.path.splitext(requested_file.filename), | ||||
|                 rar_executable=config.config_rarfile_location) | ||||
|     return None | ||||
|     return meta if len(meta) else False | ||||
|  | ||||
|  | ||||
| def upload_cover(cover_request, book): | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Ozzie Isaacs
					Ozzie Isaacs