From ffc3d648a6aed5ef488ad7cb29481ed769e1edd6 Mon Sep 17 00:00:00 2001 From: osmarks Date: Wed, 22 May 2024 18:49:32 +0100 Subject: [PATCH] basic monitoring implementation --- Cargo.lock | 23 +++++++++++++++++++++++ Cargo.toml | 4 +++- src/main.rs | 36 ++++++++++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 0457eae..1399cdc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1378,10 +1378,12 @@ dependencies = [ "half", "image", "json5", + "lazy_static", "log", "ndarray", "num_cpus", "pretty_env_logger", + "prometheus", "regex", "reqwest", "rmp-serde", @@ -1868,6 +1870,27 @@ dependencies = [ "syn 2.0.65", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror", +] + +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "qoi" version = "0.4.1" diff --git a/Cargo.toml b/Cargo.toml index dcc064e..10311e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,4 +31,6 @@ num_cpus = "1" serde_bytes = "0.11" tower-http = { version = "0.5", features = ["cors"] } tower = "0.4" -json5 = "0.4" \ No newline at end of file +json5 = "0.4" +prometheus = "0.13" +lazy_static = "1" \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 89b03ce..60636af 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,11 +25,27 @@ use futures_util::stream::{StreamExt, TryStreamExt}; use tokio_stream::wrappers::ReceiverStream; use tower_http::cors::CorsLayer; use faiss::index::scalar_quantizer; +use lazy_static::lazy_static; +use prometheus::{register_int_counter, register_int_counter_vec, register_int_gauge, Encoder, IntCounter, IntGauge, IntCounterVec}; mod ocr; use crate::ocr::scan_image; +lazy_static! { + static ref RELOADS_COUNTER: IntCounter = register_int_counter!("mse_reloads", "reloads executed").unwrap(); + static ref QUERIES_COUNTER: IntCounter = register_int_counter!("mse_queries", "queries executed").unwrap(); + static ref TERMS_COUNTER: IntCounterVec = register_int_counter_vec!("mse_terms", "terms used in queries, by type", &["type"]).unwrap(); + static ref IMAGES_LOADED_COUNTER: IntCounter = register_int_counter!("mse_loads", "images loaded by ingest process").unwrap(); + static ref IMAGES_LOADED_ERROR_COUNTER: IntCounter = register_int_counter!("mse_load_errors", "image load fails by ingest process").unwrap(); + static ref IMAGES_EMBEDDED_COUNTER: IntCounter = register_int_counter!("mse_embeds", "images embedded by ingest process").unwrap(); + static ref IMAGES_OCRED_COUNTER: IntCounter = register_int_counter!("mse_ocrs", "images OCRed by ingest process").unwrap(); + static ref IMAGES_OCRED_ERROR_COUNTER: IntCounter = register_int_counter!("mse_ocr_errors", "image OCR fails by ingest process").unwrap(); + static ref IMAGES_THUMBNAILED_COUNTER: IntCounter = register_int_counter!("mse_thumbnails", "images thumbnailed by ingest process").unwrap(); + static ref THUMBNAILS_GENERATED_COUNTER: IntCounterVec = register_int_counter_vec!("mse_thumbnail_outputs", "thumbnails produced by ingest process", &["output_format"]).unwrap(); + static ref LAST_INDEX_SIZE: IntGauge = register_int_gauge!("mse_index_size", "images in loaded index").unwrap(); +} + fn function_which_returns_50() -> usize { 50 } #[derive(Debug, Deserialize, Clone)] @@ -291,9 +307,11 @@ async fn ingest_files(config: Arc, backend: Arc) Ok(image) => image, Err(e) => { log::error!("Could not read {}: {}", record.filename, e); + IMAGES_LOADED_ERROR_COUNTER.inc(); return Ok(()) } }; + IMAGES_LOADED_COUNTER.inc(); if record.embedding.is_none() { let resized = resize_for_embed(backend.clone(), image.clone()).await?; @@ -394,11 +412,13 @@ async fn ingest_files(config: Arc, backend: Arc) format_config, ), ); + THUMBNAILS_GENERATED_COUNTER.get_metric_with_label_values(&[format_name]).unwrap().inc(); std::fs::write(thumbnail_path, resized)?; } } Ok::, anyhow::Error>(generated_formats) }).await??; + IMAGES_THUMBNAILED_COUNTER.inc(); let formats_data = rmp_serde::to_vec(&generated_formats)?; let ts = timestamp(); sqlx::query!( @@ -431,10 +451,12 @@ async fn ingest_files(config: Arc, backend: Arc) let scan = match scan_image(&client, &image.image).await { Ok(scan) => scan, Err(e) => { + IMAGES_OCRED_ERROR_COUNTER.inc(); log::error!("OCR failure {}: {}", image.filename, e); return Ok(()) } }; + IMAGES_OCRED_COUNTER.inc(); let ocr_text = scan .iter() .map(|segment| segment.text.clone()) @@ -484,6 +506,7 @@ async fn ingest_files(config: Arc, backend: Arc) for (i, vector) in result.into_iter().enumerate() { let vector = vector.into_vec(); log::debug!("embedded {}", batch[i].filename); + IMAGES_EMBEDDED_COUNTER.inc(); sqlx::query!( "UPDATE files SET embedding_time = ?, embedding = ? WHERE filename = ?", ts, @@ -721,16 +744,19 @@ async fn handle_request( for term in &req.terms { if let Some(image) = &term.image { + TERMS_COUNTER.get_metric_with_label_values(&["image"]).unwrap().inc(); let bytes = BASE64_STANDARD.decode(image)?; let image = Arc::new(tokio::task::block_in_place(|| image::load_from_memory(&bytes))?); image_batch.push(serde_bytes::ByteBuf::from(resize_for_embed(backend_config.clone(), image).await?)); image_weights.push(term.weight.unwrap_or(1.0)); } if let Some(text) = &term.text { + TERMS_COUNTER.get_metric_with_label_values(&["text"]).unwrap().inc(); text_batch.push(text.clone()); text_weights.push(term.weight.unwrap_or(1.0)); } if let Some(embedding) = &term.embedding { + TERMS_COUNTER.get_metric_with_label_values(&["embedding"]).unwrap().inc(); let weight = term.weight.unwrap_or(1.0); for (i, value) in embedding.iter().enumerate() { total_embedding[i] += value * weight; @@ -825,6 +851,7 @@ async fn main() -> Result<()> { Ok(_) => { match build_index(config.clone(), backend.clone()).await { Ok(new_index) => { + LAST_INDEX_SIZE.set(new_index.vectors.ntotal() as i64); *index.write().await = new_index; } Err(e) => { @@ -839,6 +866,7 @@ async fn main() -> Result<()> { } } ingest_done_tx.send((true, format!("OK"))).unwrap(); + RELOADS_COUNTER.inc(); request_ingest_rx.recv().await; } } @@ -854,6 +882,7 @@ async fn main() -> Result<()> { let backend_config = backend.clone(); let index = index.read().await; // TODO: use ConcurrentIndex here let client = client.clone(); + QUERIES_COUNTER.inc(); handle_request(&config, backend_config, client.clone(), &index, req).await.map_err(|e| format!("{:?}", e)) })) .route("/", get(|_req: axum::http::Request| async move { @@ -881,6 +910,13 @@ async fn main() -> Result<()> { } } })) + .route("/metrics", get(|_req: axum::http::Request| async move { + let mut buffer = Vec::new(); + let encoder = prometheus::TextEncoder::new(); + let metric_families = prometheus::gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + buffer + })) .layer(cors); let addr = format!("0.0.0.0:{}", config_.port);