1
0
mirror of https://github.com/osmarks/meme-search-engine.git synced 2024-09-21 10:09:36 +00:00

basic monitoring implementation

This commit is contained in:
osmarks 2024-05-22 18:49:32 +01:00
parent ce590298a7
commit ffc3d648a6
3 changed files with 62 additions and 1 deletions

23
Cargo.lock generated
View File

@ -1378,10 +1378,12 @@ dependencies = [
"half",
"image",
"json5",
"lazy_static",
"log",
"ndarray",
"num_cpus",
"pretty_env_logger",
"prometheus",
"regex",
"reqwest",
"rmp-serde",
@ -1868,6 +1870,27 @@ dependencies = [
"syn 2.0.65",
]
[[package]]
name = "prometheus"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"memchr",
"parking_lot",
"protobuf",
"thiserror",
]
[[package]]
name = "protobuf"
version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]]
name = "qoi"
version = "0.4.1"

View File

@ -31,4 +31,6 @@ num_cpus = "1"
serde_bytes = "0.11"
tower-http = { version = "0.5", features = ["cors"] }
tower = "0.4"
json5 = "0.4"
json5 = "0.4"
prometheus = "0.13"
lazy_static = "1"

View File

@ -25,11 +25,27 @@ use futures_util::stream::{StreamExt, TryStreamExt};
use tokio_stream::wrappers::ReceiverStream;
use tower_http::cors::CorsLayer;
use faiss::index::scalar_quantizer;
use lazy_static::lazy_static;
use prometheus::{register_int_counter, register_int_counter_vec, register_int_gauge, Encoder, IntCounter, IntGauge, IntCounterVec};
mod ocr;
use crate::ocr::scan_image;
lazy_static! {
static ref RELOADS_COUNTER: IntCounter = register_int_counter!("mse_reloads", "reloads executed").unwrap();
static ref QUERIES_COUNTER: IntCounter = register_int_counter!("mse_queries", "queries executed").unwrap();
static ref TERMS_COUNTER: IntCounterVec = register_int_counter_vec!("mse_terms", "terms used in queries, by type", &["type"]).unwrap();
static ref IMAGES_LOADED_COUNTER: IntCounter = register_int_counter!("mse_loads", "images loaded by ingest process").unwrap();
static ref IMAGES_LOADED_ERROR_COUNTER: IntCounter = register_int_counter!("mse_load_errors", "image load fails by ingest process").unwrap();
static ref IMAGES_EMBEDDED_COUNTER: IntCounter = register_int_counter!("mse_embeds", "images embedded by ingest process").unwrap();
static ref IMAGES_OCRED_COUNTER: IntCounter = register_int_counter!("mse_ocrs", "images OCRed by ingest process").unwrap();
static ref IMAGES_OCRED_ERROR_COUNTER: IntCounter = register_int_counter!("mse_ocr_errors", "image OCR fails by ingest process").unwrap();
static ref IMAGES_THUMBNAILED_COUNTER: IntCounter = register_int_counter!("mse_thumbnails", "images thumbnailed by ingest process").unwrap();
static ref THUMBNAILS_GENERATED_COUNTER: IntCounterVec = register_int_counter_vec!("mse_thumbnail_outputs", "thumbnails produced by ingest process", &["output_format"]).unwrap();
static ref LAST_INDEX_SIZE: IntGauge = register_int_gauge!("mse_index_size", "images in loaded index").unwrap();
}
fn function_which_returns_50() -> usize { 50 }
#[derive(Debug, Deserialize, Clone)]
@ -291,9 +307,11 @@ async fn ingest_files(config: Arc<Config>, backend: Arc<InferenceServerConfig>)
Ok(image) => image,
Err(e) => {
log::error!("Could not read {}: {}", record.filename, e);
IMAGES_LOADED_ERROR_COUNTER.inc();
return Ok(())
}
};
IMAGES_LOADED_COUNTER.inc();
if record.embedding.is_none() {
let resized = resize_for_embed(backend.clone(), image.clone()).await?;
@ -394,11 +412,13 @@ async fn ingest_files(config: Arc<Config>, backend: Arc<InferenceServerConfig>)
format_config,
),
);
THUMBNAILS_GENERATED_COUNTER.get_metric_with_label_values(&[format_name]).unwrap().inc();
std::fs::write(thumbnail_path, resized)?;
}
}
Ok::<Vec<String>, anyhow::Error>(generated_formats)
}).await??;
IMAGES_THUMBNAILED_COUNTER.inc();
let formats_data = rmp_serde::to_vec(&generated_formats)?;
let ts = timestamp();
sqlx::query!(
@ -431,10 +451,12 @@ async fn ingest_files(config: Arc<Config>, backend: Arc<InferenceServerConfig>)
let scan = match scan_image(&client, &image.image).await {
Ok(scan) => scan,
Err(e) => {
IMAGES_OCRED_ERROR_COUNTER.inc();
log::error!("OCR failure {}: {}", image.filename, e);
return Ok(())
}
};
IMAGES_OCRED_COUNTER.inc();
let ocr_text = scan
.iter()
.map(|segment| segment.text.clone())
@ -484,6 +506,7 @@ async fn ingest_files(config: Arc<Config>, backend: Arc<InferenceServerConfig>)
for (i, vector) in result.into_iter().enumerate() {
let vector = vector.into_vec();
log::debug!("embedded {}", batch[i].filename);
IMAGES_EMBEDDED_COUNTER.inc();
sqlx::query!(
"UPDATE files SET embedding_time = ?, embedding = ? WHERE filename = ?",
ts,
@ -721,16 +744,19 @@ async fn handle_request(
for term in &req.terms {
if let Some(image) = &term.image {
TERMS_COUNTER.get_metric_with_label_values(&["image"]).unwrap().inc();
let bytes = BASE64_STANDARD.decode(image)?;
let image = Arc::new(tokio::task::block_in_place(|| image::load_from_memory(&bytes))?);
image_batch.push(serde_bytes::ByteBuf::from(resize_for_embed(backend_config.clone(), image).await?));
image_weights.push(term.weight.unwrap_or(1.0));
}
if let Some(text) = &term.text {
TERMS_COUNTER.get_metric_with_label_values(&["text"]).unwrap().inc();
text_batch.push(text.clone());
text_weights.push(term.weight.unwrap_or(1.0));
}
if let Some(embedding) = &term.embedding {
TERMS_COUNTER.get_metric_with_label_values(&["embedding"]).unwrap().inc();
let weight = term.weight.unwrap_or(1.0);
for (i, value) in embedding.iter().enumerate() {
total_embedding[i] += value * weight;
@ -825,6 +851,7 @@ async fn main() -> Result<()> {
Ok(_) => {
match build_index(config.clone(), backend.clone()).await {
Ok(new_index) => {
LAST_INDEX_SIZE.set(new_index.vectors.ntotal() as i64);
*index.write().await = new_index;
}
Err(e) => {
@ -839,6 +866,7 @@ async fn main() -> Result<()> {
}
}
ingest_done_tx.send((true, format!("OK"))).unwrap();
RELOADS_COUNTER.inc();
request_ingest_rx.recv().await;
}
}
@ -854,6 +882,7 @@ async fn main() -> Result<()> {
let backend_config = backend.clone();
let index = index.read().await; // TODO: use ConcurrentIndex here
let client = client.clone();
QUERIES_COUNTER.inc();
handle_request(&config, backend_config, client.clone(), &index, req).await.map_err(|e| format!("{:?}", e))
}))
.route("/", get(|_req: axum::http::Request<axum::body::Body>| async move {
@ -881,6 +910,13 @@ async fn main() -> Result<()> {
}
}
}))
.route("/metrics", get(|_req: axum::http::Request<axum::body::Body>| async move {
let mut buffer = Vec::new();
let encoder = prometheus::TextEncoder::new();
let metric_families = prometheus::gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
buffer
}))
.layer(cors);
let addr = format!("0.0.0.0:{}", config_.port);