mirror of
				https://github.com/janeczku/calibre-web
				synced 2025-10-31 07:13:02 +00:00 
			
		
		
		
	Fix deprecation warning datetime
Started metadata parsing for audio files
This commit is contained in:
		
							
								
								
									
										79
									
								
								cps/audio.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										79
									
								
								cps/audio.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,79 @@ | ||||
| # -*- coding: utf-8 -*- | ||||
|  | ||||
| #  This file is part of the Calibre-Web (https://github.com/janeczku/calibre-web) | ||||
| #    Copyright (C) 2024 Ozzieisaacs | ||||
| # | ||||
| #  This program is free software: you can redistribute it and/or modify | ||||
| #  it under the terms of the GNU General Public License as published by | ||||
| #  the Free Software Foundation, either version 3 of the License, or | ||||
| #  (at your option) any later version. | ||||
| # | ||||
| #  This program is distributed in the hope that it will be useful, | ||||
| #  but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| #  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| #  GNU General Public License for more details. | ||||
| # | ||||
| #  You should have received a copy of the GNU General Public License | ||||
| #  along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
|  | ||||
| import os | ||||
|  | ||||
| import mutagen | ||||
| import base64 | ||||
|  | ||||
| from cps.constants import BookMeta | ||||
|  | ||||
|  | ||||
| def get_audio_file_info(tmp_file_path, original_file_extension, original_file_name): | ||||
|     tmp_cover_name = None | ||||
|     audio_file = mutagen.File(tmp_file_path) | ||||
|     if original_file_extension in [".mp3", ".wav"]: | ||||
|         title = audio_file.tags.get('TIT2').text[0] if "TIT2" in audio_file.tags else None | ||||
|         author = audio_file.tags.get('TPE1').text[0] if "TPE1" in audio_file.tags else None | ||||
|         if author is None: | ||||
|             author = audio_file.tags.get('TPE2').text[0] if "TPE2" in audio_file.tags else None | ||||
|         comments = audio_file.tags.get('COMM').text[0] if "COMM" in audio_file.tags else None | ||||
|         tags = audio_file.tags.get('TCON').text[0] if "TCON" in audio_file.tags else None # Genre | ||||
|         series = audio_file.tags.get('TALB').text[0] if "TALB" in audio_file.tags else None# Album | ||||
|         series_id = audio_file.tags.get('TRCK').text[0] if "TRCK" in audio_file.tags else None # track no. | ||||
|         publisher = audio_file.tags.get('TPUB').text[0] if "TPUB" in audio_file.tags else None | ||||
|         pubdate = audio_file.tags.get('XDOR').text[0] if "XDOR" in audio_file.tags else None | ||||
|         cover_data = audio_file.tags.get('APIC:') | ||||
|         if cover_data: | ||||
|             tmp_cover_name = os.path.join(os.path.dirname(tmp_file_path), 'cover.jpg') | ||||
|             with open(tmp_cover_name, "wb") as cover_file: | ||||
|                 cover_file.write(cover_data.data) | ||||
|     elif original_file_extension in [".ogg", ".flac"]: | ||||
|         title = audio_file.tags.get('TITLE')[0] if "TITLE" in audio_file else None | ||||
|         author = audio_file.tags.get('ARTIST')[0] if "ARTIST" in audio_file else None | ||||
|         comments = None # audio_file.tags.get('COMM', None) | ||||
|         tags = "" | ||||
|         series = audio_file.tags.get('ALBUM')[0] if "ALBUM" in audio_file else None | ||||
|         series_id = audio_file.tags.get('TRACKNUMBER')[0] if "TRACKNUMBER" in audio_file else None | ||||
|         publisher = audio_file.tags.get('LABEL')[0] if "LABEL" in audio_file else None | ||||
|         pubdate = audio_file.tags.get('DATE')[0] if "DATE" in audio_file else None | ||||
|         cover_data = audio_file.tags.get('METADATA_BLOCK_PICTURE') | ||||
|         if cover_data: | ||||
|             tmp_cover_name = os.path.join(os.path.dirname(tmp_file_path), 'cover.jpg') | ||||
|             with open(tmp_cover_name, "wb") as cover_file: | ||||
|                 cover_file.write(mutagen.flac.Picture(base64.b64decode(cover_data[0])).data) | ||||
|         if hasattr(audio_file, "pictures"): | ||||
|             tmp_cover_name = os.path.join(os.path.dirname(tmp_file_path), 'cover.jpg') | ||||
|             with open(tmp_cover_name, "wb") as cover_file: | ||||
|                 cover_file.write(audio_file.pictures[0].data) | ||||
|  | ||||
|     return BookMeta( | ||||
|         file_path=tmp_file_path, | ||||
|         extension=original_file_extension, | ||||
|         title=title or original_file_name , | ||||
|         author="Unknown" if author is None else author, | ||||
|         cover=tmp_cover_name, | ||||
|         description="" if comments is None else comments, | ||||
|         tags="" if tags is None else tags, | ||||
|         series="" if series is None else series, | ||||
|         series_id="1" if series_id is None else series_id.split("/")[0], | ||||
|         languages="", | ||||
|         publisher= "" if publisher is None else publisher, | ||||
|         pubdate="" if pubdate is None else pubdate, | ||||
|         identifiers=[], | ||||
|     ) | ||||
| @@ -1,4 +1,4 @@ | ||||
| from datetime import datetime | ||||
| from datetime import datetime, UTC | ||||
| from datetime import timedelta | ||||
| import hashlib | ||||
|  | ||||
| @@ -496,7 +496,7 @@ class LoginManager: | ||||
|             duration = timedelta(seconds=duration) | ||||
|  | ||||
|         try: | ||||
|             expires = datetime.utcnow() + duration | ||||
|             expires = datetime.now(UTC) + duration | ||||
|         except TypeError as e: | ||||
|             raise Exception( | ||||
|                 "REMEMBER_COOKIE_DURATION must be a datetime.timedelta," | ||||
|   | ||||
| @@ -200,7 +200,7 @@ def edit_book(book_id): | ||||
|             book.pubdate = db.Books.DEFAULT_PUBDATE | ||||
|  | ||||
|         if modify_date: | ||||
|             book.last_modified = datetime.utcnow() | ||||
|             book.last_modified = datetime.now(UTC) | ||||
|             kobo_sync_status.remove_synced_book(edited_books_id, all=True) | ||||
|             calibre_db.set_metadata_dirty(book.id) | ||||
|  | ||||
| @@ -440,7 +440,7 @@ def edit_list_book(param): | ||||
|                                mimetype='application/json') | ||||
|         else: | ||||
|             return _("Parameter not found"), 400 | ||||
|         book.last_modified = datetime.utcnow() | ||||
|         book.last_modified = datetime.now(UTC) | ||||
|  | ||||
|         calibre_db.session.commit() | ||||
|         # revert change for sort if automatic fields link is deactivated | ||||
| @@ -556,7 +556,7 @@ def table_xchange_author_title(): | ||||
|                 # toDo: Handle error | ||||
|                 edit_error = helper.update_dir_structure(edited_books_id, config.get_book_path(), input_authors[0]) | ||||
|             if modify_date: | ||||
|                 book.last_modified = datetime.utcnow() | ||||
|                 book.last_modified = datetime.now(UTC) | ||||
|                 calibre_db.set_metadata_dirty(book.id) | ||||
|             try: | ||||
|                 calibre_db.session.commit() | ||||
|   | ||||
| @@ -25,7 +25,7 @@ import re | ||||
| import regex | ||||
| import shutil | ||||
| import socket | ||||
| from datetime import datetime, timedelta | ||||
| from datetime import datetime, timedelta, UTC | ||||
| import requests | ||||
| import unidecode | ||||
| from uuid import uuid4 | ||||
| @@ -793,7 +793,7 @@ def get_book_cover_thumbnail(book, resolution): | ||||
|             .filter(ub.Thumbnail.type == THUMBNAIL_TYPE_COVER) \ | ||||
|             .filter(ub.Thumbnail.entity_id == book.id) \ | ||||
|             .filter(ub.Thumbnail.resolution == resolution) \ | ||||
|             .filter(or_(ub.Thumbnail.expiration.is_(None), ub.Thumbnail.expiration > datetime.utcnow())) \ | ||||
|             .filter(or_(ub.Thumbnail.expiration.is_(None), ub.Thumbnail.expiration > datetime.now(UTC)) \ | ||||
|             .first() | ||||
|  | ||||
|  | ||||
| @@ -832,7 +832,7 @@ def get_series_thumbnail(series_id, resolution): | ||||
|         .filter(ub.Thumbnail.type == THUMBNAIL_TYPE_SERIES) \ | ||||
|         .filter(ub.Thumbnail.entity_id == series_id) \ | ||||
|         .filter(ub.Thumbnail.resolution == resolution) \ | ||||
|         .filter(or_(ub.Thumbnail.expiration.is_(None), ub.Thumbnail.expiration > datetime.utcnow())) \ | ||||
|         .filter(or_(ub.Thumbnail.expiration.is_(None), ub.Thumbnail.expiration > datetime.now(UTC))) \ | ||||
|         .first() | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -18,7 +18,7 @@ | ||||
| #  along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
|  | ||||
| import base64 | ||||
| import datetime | ||||
| from datetime import datetime, UTC | ||||
| import os | ||||
| import uuid | ||||
| import zipfile | ||||
| @@ -131,7 +131,7 @@ def convert_to_kobo_timestamp_string(timestamp): | ||||
|         return timestamp.strftime("%Y-%m-%dT%H:%M:%SZ") | ||||
|     except AttributeError as exc: | ||||
|         log.debug("Timestamp not valid: {}".format(exc)) | ||||
|         return datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") | ||||
|         return datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") | ||||
|  | ||||
|  | ||||
| @kobo.route("/v1/library/sync") | ||||
| @@ -375,7 +375,7 @@ def create_book_entitlement(book, archived): | ||||
|     book_uuid = str(book.uuid) | ||||
|     return { | ||||
|         "Accessibility": "Full", | ||||
|         "ActivePeriod": {"From": convert_to_kobo_timestamp_string(datetime.datetime.utcnow())}, | ||||
|         "ActivePeriod": {"From": convert_to_kobo_timestamp_string(datetime.now(UTC))}, | ||||
|         "Created": convert_to_kobo_timestamp_string(book.timestamp), | ||||
|         "CrossRevisionId": book_uuid, | ||||
|         "Id": book_uuid, | ||||
| @@ -795,7 +795,7 @@ def HandleStateRequest(book_uuid): | ||||
|                 if new_book_read_status == ub.ReadBook.STATUS_IN_PROGRESS \ | ||||
|                         and new_book_read_status != book_read.read_status: | ||||
|                     book_read.times_started_reading += 1 | ||||
|                     book_read.last_time_started_reading = datetime.datetime.utcnow() | ||||
|                     book_read.last_time_started_reading = datetime.datetime.now(UTC) | ||||
|                 book_read.read_status = new_book_read_status | ||||
|                 update_results_response["StatusInfoResult"] = {"Result": "Success"} | ||||
|         except (KeyError, TypeError, ValueError, StatementError): | ||||
|   | ||||
| @@ -19,7 +19,7 @@ | ||||
|  | ||||
| from .cw_login import current_user | ||||
| from . import ub | ||||
| import datetime | ||||
| from datetime import datetime, UTC | ||||
| from sqlalchemy.sql.expression import or_, and_, true | ||||
| # from sqlalchemy import exc | ||||
|  | ||||
| @@ -58,7 +58,7 @@ def change_archived_books(book_id, state=None, message=None): | ||||
|         archived_book = ub.ArchivedBook(user_id=current_user.id, book_id=book_id) | ||||
|  | ||||
|     archived_book.is_archived = state if state else not archived_book.is_archived | ||||
|     archived_book.last_modified = datetime.datetime.utcnow()        # toDo. Check utc timestamp | ||||
|     archived_book.last_modified = datetime.now(UTC)        # toDo. Check utc timestamp | ||||
|  | ||||
|     ub.session.merge(archived_book) | ||||
|     ub.session_commit(message) | ||||
|   | ||||
							
								
								
									
										10
									
								
								cps/shelf.py
									
									
									
									
									
								
							
							
						
						
									
										10
									
								
								cps/shelf.py
									
									
									
									
									
								
							| @@ -21,7 +21,7 @@ | ||||
| #  along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
|  | ||||
| import sys | ||||
| from datetime import datetime | ||||
| from datetime import datetime, UTC | ||||
|  | ||||
| from flask import Blueprint, flash, redirect, request, url_for, abort | ||||
| from flask_babel import gettext as _ | ||||
| @@ -80,7 +80,7 @@ def add_to_shelf(shelf_id, book_id): | ||||
|         return "%s is a invalid Book Id. Could not be added to Shelf" % book_id, 400 | ||||
|  | ||||
|     shelf.books.append(ub.BookShelf(shelf=shelf.id, book_id=book_id, order=maxOrder + 1)) | ||||
|     shelf.last_modified = datetime.utcnow() | ||||
|     shelf.last_modified = datetime.now(UTC) | ||||
|     try: | ||||
|         ub.session.merge(shelf) | ||||
|         ub.session.commit() | ||||
| @@ -139,7 +139,7 @@ def search_to_shelf(shelf_id): | ||||
|         for book in books_for_shelf: | ||||
|             maxOrder += 1 | ||||
|             shelf.books.append(ub.BookShelf(shelf=shelf.id, book_id=book, order=maxOrder)) | ||||
|         shelf.last_modified = datetime.utcnow() | ||||
|         shelf.last_modified = datetime.now(UTC) | ||||
|         try: | ||||
|             ub.session.merge(shelf) | ||||
|             ub.session.commit() | ||||
| @@ -185,7 +185,7 @@ def remove_from_shelf(shelf_id, book_id): | ||||
|  | ||||
|         try: | ||||
|             ub.session.delete(book_shelf) | ||||
|             shelf.last_modified = datetime.utcnow() | ||||
|             shelf.last_modified = datetime.now(UTC) | ||||
|             ub.session.commit() | ||||
|         except (OperationalError, InvalidRequestError) as e: | ||||
|             ub.session.rollback() | ||||
| @@ -271,7 +271,7 @@ def order_shelf(shelf_id): | ||||
|             for book in books_in_shelf: | ||||
|                 setattr(book, 'order', to_save[str(book.book_id)]) | ||||
|                 counter += 1 | ||||
|                 # if order different from before -> shelf.last_modified = datetime.utcnow() | ||||
|                 # if order different from before -> shelf.last_modified = datetime.now(UTC) | ||||
|             try: | ||||
|                 ub.session.commit() | ||||
|             except (OperationalError, InvalidRequestError) as e: | ||||
|   | ||||
| @@ -123,7 +123,7 @@ class TaskGenerateCoverThumbnails(CalibreTask): | ||||
|             .query(ub.Thumbnail) \ | ||||
|             .filter(ub.Thumbnail.type == constants.THUMBNAIL_TYPE_COVER) \ | ||||
|             .filter(ub.Thumbnail.entity_id == book_id) \ | ||||
|             .filter(or_(ub.Thumbnail.expiration.is_(None), ub.Thumbnail.expiration > datetime.utcnow())) \ | ||||
|             .filter(or_(ub.Thumbnail.expiration.is_(None), ub.Thumbnail.expiration > datetime.now(UTC))) \ | ||||
|             .all() | ||||
|  | ||||
|     def create_book_cover_thumbnails(self, book): | ||||
| @@ -165,7 +165,7 @@ class TaskGenerateCoverThumbnails(CalibreTask): | ||||
|             self.app_db_session.rollback() | ||||
|  | ||||
|     def update_book_cover_thumbnail(self, book, thumbnail): | ||||
|         thumbnail.generated_at = datetime.utcnow() | ||||
|         thumbnail.generated_at = datetime.now(UTC) | ||||
|  | ||||
|         try: | ||||
|             self.app_db_session.commit() | ||||
| @@ -326,7 +326,7 @@ class TaskGenerateSeriesThumbnails(CalibreTask): | ||||
|             .query(ub.Thumbnail) \ | ||||
|             .filter(ub.Thumbnail.type == constants.THUMBNAIL_TYPE_SERIES) \ | ||||
|             .filter(ub.Thumbnail.entity_id == series_id) \ | ||||
|             .filter(or_(ub.Thumbnail.expiration.is_(None), ub.Thumbnail.expiration > datetime.utcnow())) \ | ||||
|             .filter(or_(ub.Thumbnail.expiration.is_(None), ub.Thumbnail.expiration > datetime.now(UTC))) \ | ||||
|             .all() | ||||
|  | ||||
|     def create_series_thumbnail(self, series, series_books, resolution): | ||||
| @@ -346,7 +346,7 @@ class TaskGenerateSeriesThumbnails(CalibreTask): | ||||
|             self.app_db_session.rollback() | ||||
|  | ||||
|     def update_series_thumbnail(self, series_books, thumbnail): | ||||
|         thumbnail.generated_at = datetime.utcnow() | ||||
|         thumbnail.generated_at = datetime.now(UTC) | ||||
|  | ||||
|         try: | ||||
|             self.app_db_session.commit() | ||||
|   | ||||
| @@ -20,7 +20,7 @@ | ||||
| import atexit | ||||
| import os | ||||
| import sys | ||||
| import datetime | ||||
| from datetime import datetime, UTC | ||||
| import itertools | ||||
| import uuid | ||||
| from flask import session as flask_session | ||||
| @@ -495,11 +495,11 @@ def receive_before_flush(session, flush_context, instances): | ||||
|     for change in itertools.chain(session.new, session.dirty): | ||||
|         if isinstance(change, (ReadBook, KoboStatistics, KoboBookmark)): | ||||
|             if change.kobo_reading_state: | ||||
|                 change.kobo_reading_state.last_modified = datetime.datetime.utcnow() | ||||
|                 change.kobo_reading_state.last_modified = datetime.now(UTC) | ||||
|     # Maintain the last_modified bit for the Shelf table. | ||||
|     for change in itertools.chain(session.new, session.deleted): | ||||
|         if isinstance(change, BookShelf): | ||||
|             change.ub_shelf.last_modified = datetime.datetime.utcnow() | ||||
|             change.ub_shelf.last_modified = datetime.now(UTC) | ||||
|  | ||||
|  | ||||
| # Baseclass representing Downloads from calibre-web in app.db | ||||
| @@ -563,7 +563,7 @@ class Thumbnail(Base): | ||||
|     type = Column(SmallInteger, default=constants.THUMBNAIL_TYPE_COVER) | ||||
|     resolution = Column(SmallInteger, default=constants.COVER_THUMBNAIL_SMALL) | ||||
|     filename = Column(String, default=filename) | ||||
|     generated_at = Column(DateTime, default=lambda: datetime.datetime.utcnow()) | ||||
|     generated_at = Column(DateTime, default=lambda: datetime.now(UTC)) | ||||
|     expiration = Column(DateTime, nullable=True) | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -68,6 +68,13 @@ except ImportError as e: | ||||
|     log.debug('Cannot import fb2, extracting fb2 metadata will not work: %s', e) | ||||
|     use_fb2_meta = False | ||||
|  | ||||
| try: | ||||
|     from . import audio | ||||
|     use_audio_meta = True | ||||
| except ImportError as e: | ||||
|     log.debug('Cannot import mutagen, extracting audio metadata will not work: %s', e) | ||||
|     use_audio_meta = False | ||||
|  | ||||
|  | ||||
| def process(tmp_file_path, original_file_name, original_file_extension, rar_executable): | ||||
|     meta = default_meta(tmp_file_path, original_file_name, original_file_extension) | ||||
| @@ -84,6 +91,8 @@ def process(tmp_file_path, original_file_name, original_file_extension, rar_exec | ||||
|                                         original_file_name, | ||||
|                                         original_file_extension, | ||||
|                                         rar_executable) | ||||
|         elif extension_upper in [".MP3", ".OGG", ".FLAC", ".WAV"] and use_audio_meta: | ||||
|             meta = audio.get_audio_file_info(tmp_file_path, original_file_extension, original_file_name) | ||||
|     except Exception as ex: | ||||
|         log.warning('cannot parse metadata, using default: %s', ex) | ||||
|  | ||||
|   | ||||
| @@ -36,6 +36,7 @@ python-dateutil>=2.1,<2.10.0 | ||||
| beautifulsoup4>=4.0.1,<4.13.0 | ||||
| faust-cchardet>=2.1.18,<2.1.20 | ||||
| py7zr>=0.15.0,<0.21.0 | ||||
| mutagen>=1.40.0,<1.50.0 | ||||
|  | ||||
| # Comics | ||||
| natsort>=2.2.0,<8.5.0 | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Ozzie Isaacs
					Ozzie Isaacs