1
0
mirror of https://github.com/osmarks/meme-search-engine.git synced 2026-05-08 00:31:22 +00:00

updates & minor fixes

This commit is contained in:
osmarks
2025-11-07 16:59:25 +00:00
parent a1bf23055e
commit 00afb01e7a
4 changed files with 1794 additions and 856 deletions

2568
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -34,14 +34,14 @@ serde_bytes = "0.11"
tower-http = { version = "0.5", features = ["cors"] }
tower = "0.4"
json5 = "0.4"
prometheus = "0.13"
prometheus = { version = "0.13", features = ["process"] }
lazy_static = "1"
zstd = "0.13"
url = "2"
fastrand = "2"
mimalloc = "0.1"
sonic-rs = "0.3"
ffmpeg-the-third = "2.0"
ffmpeg-the-third = "4.0"
compact_str = { version = "0.8.0-beta", features = ["serde"] }
itertools = "0.13"
async-recursion = "1"
@@ -58,7 +58,7 @@ simsimd = "6"
foldhash = "0.1"
memmap2 = "0.9"
candle-core = "0.8"
monoio = "0.2"
monoio = { version = "0.2", features = ["sync"] }
hyper = "1"
monoio-compat = { version = "0.2", features = ["hyper"] }
http-body-util = "0.1"

View File

@@ -1,6 +1,6 @@
use anyhow::{Context, Result};
use lazy_static::lazy_static;
use monoio::fs;
use monoio::{blocking::DefaultThreadPool, fs};
use std::path::PathBuf;
use base64::Engine;
use argh::FromArgs;
@@ -48,7 +48,9 @@ struct CLIArguments {
#[argh(option, short='c', description="server config file")]
config_path: Option<String>,
#[argh(switch, short='l', description="lock memory")]
lock_memory: bool
lock_memory: bool,
#[argh(option, short='T', description="number of threads to use")]
threads: Option<usize>
}
#[derive(Deserialize, Clone)]
@@ -478,29 +480,35 @@ impl hyper::service::Service<Request<Incoming>> for Service {
QUERIES_COUNTER.inc();
let n_visited = scratch.visited_list.len();
let n_dims = index.header.quantizer.n_dims;
let visited_embeddings = std::mem::replace(&mut scratch.visited_embeddings, vec![]);
let mut similarities_against_self = vec![0.0f32; n_visited * n_visited];
let similarities_against_self = monoio::spawn_blocking(move || {
let mut similarities_against_self = vec![0.0f32; n_visited * n_visited];
// runtime deduplicate of results list
unsafe {
// vecs @ vecs.T
matrixmultiply::sgemm(
n_visited,
index.header.quantizer.n_dims,
n_visited,
1.0,
scratch.visited_embeddings.as_ptr(),
index.header.quantizer.n_dims as isize,
1,
scratch.visited_embeddings.as_ptr(),
1,
index.header.quantizer.n_dims as isize,
0.0,
similarities_against_self.as_mut_ptr(),
n_visited as isize,
1
);
}
// runtime deduplication of results list
unsafe {
// vecs @ vecs.T
matrixmultiply::sgemm(
n_visited,
n_dims,
n_visited,
1.0,
visited_embeddings.as_ptr(),
n_dims as isize,
1,
visited_embeddings.as_ptr(),
1,
n_dims as isize,
0.0,
similarities_against_self.as_mut_ptr(),
n_visited as isize,
1
);
}
similarities_against_self
}).await.map_err(|e| anyhow::anyhow!("threadpool error: {:?}", e))?;
// discard anything similar to something already in list
let mut i = 0;
@@ -709,11 +717,15 @@ fn main() -> Result<()> {
if args.config_path.is_some() {
let mut join_handles = vec![];
for _ in 0..num_cpus::get() {
for _ in 0..args.threads.unwrap_or(num_cpus::get()) {
let args_ = args.clone();
let maps_ = maps.clone();
let handle = std::thread::spawn(move || {
let mut rt = monoio::RuntimeBuilder::<monoio::IoUringDriver>::new().enable_timer().build().unwrap();
let pool = DefaultThreadPool::new(num_cpus::get());
let mut rt = monoio::RuntimeBuilder::<monoio::IoUringDriver>::new()
.attach_thread_pool(Box::new(pool))
.enable_timer()
.build().unwrap();
let index = rt.block_on(initialize_index(args_.clone(), maps_))?;
rt.block_on(serve(args_, index))
});

View File

@@ -25,8 +25,8 @@ pub fn run<P: AsRef<std::path::Path>, F: FnMut(RgbImage) -> Result<()>>(path: P,
aspect_ratio = (1, 1);
}
graph.add(&filter::find("buffer").unwrap(), "in",
&format!("video_size={}x{}:pix_fmt={}:time_base={}/{}:pixel_aspect={}/{}", decoder.width(), decoder.height(), decoder.format().descriptor().unwrap().name(), video.time_base().0, video.time_base().1, aspect_ratio.0, aspect_ratio.1))?;
graph.add(&filter::find("buffer").unwrap(), "in",
&format!("video_size={}x{}:pix_fmt={}:time_base={}/{}:pixel_aspect={}/{}", decoder.width(), decoder.height(), decoder.format().descriptor().context("invalid format")?.name(), video.time_base().0, video.time_base().1, aspect_ratio.0, aspect_ratio.1))?;
graph.add(&filter::find("buffersink").unwrap(), "out", "")?;
// I don't know exactly where, but some of my videos apparently have the size vary throughout them.
// This causes horrible segfaults somewhere.
@@ -45,10 +45,10 @@ pub fn run<P: AsRef<std::path::Path>, F: FnMut(RgbImage) -> Result<()>>(path: P,
if !decoder.receive_frame(&mut decoded).is_ok() { break }
let mut in_ctx = filter_graph.get("in").unwrap();
// The filters really do not like
// The filters really do not like
let mut src = in_ctx.source();
src.add(&decoded).context("add frame")?;
while filter_graph.get("out").unwrap().sink().frame(&mut filtered).is_ok() {
let mut image = vec![0u8; filtered.width() as usize * filtered.height() as usize * BYTES_PER_PIXEL];
let stride = filtered.stride(0);
@@ -56,7 +56,9 @@ pub fn run<P: AsRef<std::path::Path>, F: FnMut(RgbImage) -> Result<()>>(path: P,
let width = filtered.width() as usize * BYTES_PER_PIXEL;
let height = filtered.height() as usize;
for y in 0..height {
image[y * width .. (y + 1) * width].copy_from_slice(&data[y * stride .. y * stride + width]);
if let Some(subslice) = data.get(y * stride .. y * stride + width) {
image[y * width .. (y + 1) * width].copy_from_slice(subslice);
}
}
frame_callback(image::ImageBuffer::from_vec(filtered.width(), filtered.height(), image).unwrap())?;
}
@@ -84,4 +86,4 @@ fn main() -> Result<()> {
Ok(())
};
run(&env::args().nth(1).unwrap(), callback, 1.0)
}
}