1
0
mirror of https://github.com/osmarks/meme-search-engine.git synced 2024-09-21 01:59:37 +00:00

Adjust index storage for memory efficiency and fix SQLite interface type confusion

This commit is contained in:
osmarks 2024-06-25 08:23:30 +01:00
parent e7adf738f6
commit 1ab254ff1d
4 changed files with 60 additions and 16 deletions

36
Cargo.lock generated
View File

@ -365,6 +365,15 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9"
[[package]]
name = "castaway"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a17ed5635fc8536268e5d4de1e22e81ac34419e5f052d4d51f4e01dcc263fcc"
dependencies = [
"rustversion",
]
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.0.98" version = "1.0.98"
@ -432,6 +441,21 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b" checksum = "3d7b894f5411737b7867f4827955924d7c254fc9f4d91a6aad6b097804b1018b"
[[package]]
name = "compact_str"
version = "0.8.0-beta"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a2dc81369dde6d31456eedbb4fd3d320f0b9713573dfe06e569e2bce7607f2"
dependencies = [
"castaway",
"cfg-if",
"itoa",
"rustversion",
"ryu",
"serde",
"static_assertions",
]
[[package]] [[package]]
name = "const-oid" name = "const-oid"
version = "0.9.6" version = "0.9.6"
@ -1483,6 +1507,7 @@ dependencies = [
"axum", "axum",
"base64 0.22.1", "base64 0.22.1",
"chrono", "chrono",
"compact_str",
"faiss", "faiss",
"fastrand", "fastrand",
"ffmpeg-the-third", "ffmpeg-the-third",
@ -1581,6 +1606,15 @@ dependencies = [
"static_assertions", "static_assertions",
] ]
[[package]]
name = "nasm-rs"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe4d98d0065f4b1daf164b3eafb11974c94662e5e2396cf03f32d0bb5c17da51"
dependencies = [
"rayon",
]
[[package]] [[package]]
name = "native-tls" name = "native-tls"
version = "0.2.11" version = "0.2.11"
@ -2093,6 +2127,7 @@ dependencies = [
"av1-grain", "av1-grain",
"bitstream-io", "bitstream-io",
"built", "built",
"cc",
"cfg-if", "cfg-if",
"interpolate_name", "interpolate_name",
"itertools", "itertools",
@ -2100,6 +2135,7 @@ dependencies = [
"libfuzzer-sys", "libfuzzer-sys",
"log", "log",
"maybe-rayon", "maybe-rayon",
"nasm-rs",
"new_debug_unreachable", "new_debug_unreachable",
"noop_proc_macro", "noop_proc_macro",
"num-derive", "num-derive",

View File

@ -8,7 +8,7 @@ edition = "2021"
[dependencies] [dependencies]
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
axum = "0.7" axum = "0.7"
image = { version = "0.25", features = ["avif", "avif-native"] } image = { version = "0.25", features = ["avif", "avif-native", "nasm"] }
reqwest = { version = "0.12", features = ["multipart"] } reqwest = { version = "0.12", features = ["multipart"] }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
sqlx = { version = "0.7", features = ["runtime-tokio", "sqlite"] } sqlx = { version = "0.7", features = ["runtime-tokio", "sqlite"] }
@ -40,6 +40,7 @@ fastrand = "2"
mimalloc = "0.1" mimalloc = "0.1"
sonic-rs = "0.3" sonic-rs = "0.3"
ffmpeg-the-third = "2.0" ffmpeg-the-third = "2.0"
compact_str = { version = "0.8.0-beta", features = ["serde"] }
[patch.crates-io] [patch.crates-io]
image = { git = "https://github.com/fintelia/image/", branch = "upgrade-zune-jpeg" } image = { git = "https://github.com/fintelia/image/", branch = "upgrade-zune-jpeg" }
@ -50,4 +51,4 @@ path = "src/reddit_dump.rs"
[[bin]] [[bin]]
name = "video-reader" name = "video-reader"
path = "src/video_reader.rs" path = "src/video_reader.rs"

View File

@ -14,6 +14,7 @@ use axum::{
http::StatusCode http::StatusCode
}; };
use common::resize_for_embed_sync; use common::resize_for_embed_sync;
use compact_str::CompactString;
use image::RgbImage; use image::RgbImage;
use image::{imageops::FilterType, io::Reader as ImageReader, DynamicImage, ImageFormat}; use image::{imageops::FilterType, io::Reader as ImageReader, DynamicImage, ImageFormat};
use reqwest::Client; use reqwest::Client;
@ -122,7 +123,7 @@ struct RawFileRecord {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct FileRecord { struct FileRecord {
filename: String, filename: CompactString,
needs_embed: bool, needs_embed: bool,
needs_ocr: bool, needs_ocr: bool,
needs_thumbnail: bool needs_thumbnail: bool
@ -145,8 +146,8 @@ struct LoadedImage {
#[derive(Debug, Clone, Serialize, Deserialize, Hash)] #[derive(Debug, Clone, Serialize, Deserialize, Hash)]
enum Filename { enum Filename {
Actual(String), Actual(CompactString),
VideoFrame(String, u64) VideoFrame(CompactString, u64)
} }
// this is a somewhat horrible hack, but probably nobody has NUL bytes at the start of filenames? // this is a somewhat horrible hack, but probably nobody has NUL bytes at the start of filenames?
@ -154,7 +155,7 @@ impl Filename {
fn decode(buf: Vec<u8>) -> Result<Self> { fn decode(buf: Vec<u8>) -> Result<Self> {
Ok(match buf.strip_prefix(&[0]) { Ok(match buf.strip_prefix(&[0]) {
Some(remainder) => rmp_serde::from_read(&*remainder)?, Some(remainder) => rmp_serde::from_read(&*remainder)?,
None => Filename::Actual(String::from_utf8(buf)?.to_string()) None => Filename::Actual(CompactString::from_utf8(buf)?)
}) })
} }
@ -325,7 +326,7 @@ async fn ingest_files(config: Arc<WConfig>) -> Result<()> {
let to_ocr_tx = to_ocr_tx.clone(); let to_ocr_tx = to_ocr_tx.clone();
let video_lengths = video_lengths.clone(); let video_lengths = video_lengths.clone();
async move { async move {
let path = Path::new(&config.service.files).join(&record.filename); let path = Path::new(&config.service.files).join(&*record.filename);
let image: Result<Arc<DynamicImage>> = tokio::task::block_in_place(|| Ok(Arc::new(ImageReader::open(&path)?.with_guessed_format()?.decode()?))); let image: Result<Arc<DynamicImage>> = tokio::task::block_in_place(|| Ok(Arc::new(ImageReader::open(&path)?.with_guessed_format()?.decode()?)));
let image = match image { let image = match image {
Ok(image) => image, Ok(image) => image,
@ -490,7 +491,7 @@ async fn ingest_files(config: Arc<WConfig>) -> Result<()> {
let mut conn = pool.acquire().await?; let mut conn = pool.acquire().await?;
ensure_filename_record_exists(&mut conn, &filename_enc).await?; ensure_filename_record_exists(&mut conn, &filename_enc).await?;
match filename { match filename {
Filename::VideoFrame(container, _) => { video_thumb_times.write().await.insert(container.to_string(), timestamp()); }, Filename::VideoFrame(container, _) => { video_thumb_times.write().await.insert(container.clone(), timestamp()); },
_ => () _ => ()
} }
sqlx::query!( sqlx::query!(
@ -588,7 +589,7 @@ async fn ingest_files(config: Arc<WConfig>) -> Result<()> {
IMAGES_EMBEDDED_COUNTER.inc(); IMAGES_EMBEDDED_COUNTER.inc();
ensure_filename_record_exists(&mut *tx, &encoded_filename).await?; ensure_filename_record_exists(&mut *tx, &encoded_filename).await?;
match &batch[i].filename { match &batch[i].filename {
Filename::VideoFrame(container, _) => { video_embed_times.write().await.insert(container.to_string(), timestamp()); }, Filename::VideoFrame(container, _) => { video_embed_times.write().await.insert(container.clone(), timestamp()); },
_ => () _ => ()
} }
sqlx::query!( sqlx::query!(
@ -614,7 +615,7 @@ async fn ingest_files(config: Arc<WConfig>) -> Result<()> {
let entry = entry?; let entry = entry?;
let path = entry.path(); let path = entry.path();
if path.is_file() { if path.is_file() {
let filename = path.strip_prefix(&config.service.files)?.to_str().unwrap().to_string(); let filename = CompactString::from(path.strip_prefix(&config.service.files)?.to_str().unwrap());
let modtime = entry.metadata()?.modified()?.duration_since(std::time::UNIX_EPOCH)?; let modtime = entry.metadata()?.modified()?.duration_since(std::time::UNIX_EPOCH)?;
let modtime = modtime.as_micros() as i64; let modtime = modtime.as_micros() as i64;
actual_filenames.insert(filename.clone(), (path.to_path_buf(), modtime)); actual_filenames.insert(filename.clone(), (path.to_path_buf(), modtime));
@ -627,7 +628,8 @@ async fn ingest_files(config: Arc<WConfig>) -> Result<()> {
for (filename, (_path, modtime)) in actual_filenames.iter() { for (filename, (_path, modtime)) in actual_filenames.iter() {
let modtime = *modtime; let modtime = *modtime;
let record = sqlx::query_as!(RawFileRecord, "SELECT * FROM files WHERE filename = ?", filename) let filename_arr = filename.as_bytes();
let record = sqlx::query_as!(RawFileRecord, "SELECT * FROM files WHERE filename = ?", filename_arr)
.fetch_optional(&pool) .fetch_optional(&pool)
.await?; .await?;
@ -681,10 +683,14 @@ async fn ingest_files(config: Arc<WConfig>) -> Result<()> {
for filename in stored { for filename in stored {
let parsed_filename = Filename::decode(filename.clone())?; let parsed_filename = Filename::decode(filename.clone())?;
match parsed_filename { match parsed_filename {
Filename::Actual(s) => if !actual_filenames.contains_key(&s) { Filename::Actual(s) => {
sqlx::query!("DELETE FROM files WHERE filename = ?", s) let s = &*s;
.execute(&mut *tx) let raw = &filename;
.await?; if !actual_filenames.contains_key(s) {
sqlx::query!("DELETE FROM files WHERE filename = ?", raw)
.execute(&mut *tx)
.await?;
}
}, },
// This might fail in some cases where for whatever reason a video is replaced with a file of the same name which is not a video. Don't do that. // This might fail in some cases where for whatever reason a video is replaced with a file of the same name which is not a video. Don't do that.
Filename::VideoFrame(container, frame) => if !actual_filenames.contains_key(&container) { Filename::VideoFrame(container, frame) => if !actual_filenames.contains_key(&container) {
@ -704,6 +710,7 @@ async fn ingest_files(config: Arc<WConfig>) -> Result<()> {
for container_filename in video_lengths.keys() { for container_filename in video_lengths.keys() {
let embed_time = video_embed_times.get(container_filename); let embed_time = video_embed_times.get(container_filename);
let thumb_time = video_thumb_times.get(container_filename); let thumb_time = video_thumb_times.get(container_filename);
let container_filename: &[u8] = container_filename.as_bytes();
sqlx::query!("INSERT OR REPLACE INTO files (filename, embedding_time, thumbnail_time) VALUES (?, ?, ?)", container_filename, embed_time, thumb_time) sqlx::query!("INSERT OR REPLACE INTO files (filename, embedding_time, thumbnail_time) VALUES (?, ?, ?)", container_filename, embed_time, thumb_time)
.execute(&mut *tx) .execute(&mut *tx)
.await?; .await?;

View File

@ -2,7 +2,7 @@ extern crate ffmpeg_the_third as ffmpeg;
use anyhow::{Result, Context}; use anyhow::{Result, Context};
use image::RgbImage; use image::RgbImage;
use std::env; use std::env;
use ffmpeg::{codec, filter, format::{self, Pixel}, media::Type, util::frame::video::Video, software::scaling}; use ffmpeg::{codec, filter, format::{self, Pixel}, media::Type, util::frame::video::Video};
const BYTES_PER_PIXEL: usize = 3; const BYTES_PER_PIXEL: usize = 3;