1
0
mirror of https://github.com/osmarks/maghammer.git synced 2024-12-21 22:20:29 +00:00

update dependencies, better tracing, fix semantic chunking, fix bug

This commit is contained in:
osmarks 2024-10-20 07:22:42 +01:00
parent 503c64f989
commit a11bc0b22d
17 changed files with 1309 additions and 732 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
target

1335
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -5,7 +5,7 @@ edition = "2021"
[dependencies]
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4", "with-serde_json-1"] }
tokio = { version = "1", features = ["full"]}
tokio = { version = "1", features = ["full", "tracing"]}
chrono = { version = "0.4", features = ["serde"] }
anyhow = "1"
async-walkdir = "2"
@ -16,8 +16,6 @@ toml = "0.8"
serde = { version = "1", features = ["derive"] }
reqwest = "0.12"
deadpool-postgres = "0.14"
log = "0.4"
pretty_env_logger = "0.5"
pgvector = { version = "0.3", features = ["postgres", "halfvec"] }
tokenizers = { version = "0.19", features = ["http"] }
regex = "1"
@ -43,3 +41,10 @@ sea-query = { version = "0.30", features = ["backend-postgres", "postgres-array"
sea-query-postgres = { version = "0.4", features = ["postgres-array"] }
ulid = { version = "1", features = ["serde"] }
mail-parser = "0.9"
pcre2 = "0.2"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
console-subscriber = "0.4"
[patch.crates-io]
pcre2-sys = { git = "https://github.com/osmarks/rust-pcre2/", rev = "ec7d5cf" }

View File

@ -16,10 +16,9 @@ print("Models loaded.")
BASE = "/media/"
conn = psycopg2.connect("dbname=maghammer user=maghammer")
conn2 = psycopg2.connect("dbname=maghammer user=maghammer")
csr = conn.cursor()
csr2 = conn.cursor()
csr.execute("SELECT id, path FROM media_files WHERE auto_subs_state = 1") # PENDING
with conn.cursor() as csr:
csr.execute("SELECT id, path FROM media_files WHERE auto_subs_state = 1") # PENDING
rows = csr.fetchall()
def format_duration(seconds):
hours = int(seconds / 3600.0)
@ -29,7 +28,8 @@ def format_duration(seconds):
full_seconds = int(seconds)
return f"{hours:02}:{minutes:02}:{full_seconds:02}"
while row := csr.fetchone():
print(f"Processing {len(rows)} files...")
for row in rows:
file = row[1]
docid = row[0]
start = time.time()
@ -55,5 +55,6 @@ while row := csr.fetchone():
subs += f"[{format_duration(seg['start'])} -> {format_duration(seg['end'])}]: {seg['text'].strip()}\n"
subs = subs.strip()
csr2.execute("UPDATE media_files SET subs = %s, auto_subs_state = 2 WHERE id = %s", (subs, docid)) # GENERATED
conn2.commit()
with conn.cursor() as csr:
csr.execute("UPDATE media_files SET subs = %s, auto_subs_state = 2 WHERE id = %s", (subs, docid)) # GENERATED
conn.commit()

View File

@ -5,7 +5,7 @@ concurrency = 8
backend = "http://100.64.0.10:1706"
embedding_dim = 1024
tokenizer = "Snowflake/snowflake-arctic-embed-l"
max_tokens = 510
max_tokens = 128
batch_size = 256
[indexers.text_files]

View File

@ -1,5 +1,5 @@
use std::{collections::HashSet, sync::Arc};
use anyhow::{Context, Result};
use anyhow::Result;
use compact_str::CompactString;
use deadpool_postgres::Pool;
use futures::{pin_mut, TryStreamExt};
@ -62,14 +62,25 @@ pub trait Indexer: Sync + Send {
pub async fn delete_nonexistent_files(ctx: Arc<Ctx>, select_paths: &str, delete_by_id: &str, existing: &HashSet<CompactString>) -> Result<()> {
let conn = ctx.pool.get().await?;
let mut conn2 = ctx.pool.get().await?;
let tx = conn2.transaction().await?;
let it = conn.query_raw(select_paths, [""; 0]).await?;
pin_mut!(it);
while let Some(row) = it.try_next().await? {
let path: String = row.get(0);
let path = CompactString::from(path);
if !existing.contains(&path) {
conn.execute(delete_by_id, &[&hash_str(&path)]).await?;
tx.execute(delete_by_id, &[&hash_str(&path)]).await?;
}
}
tx.commit().await?;
Ok(())
}
impl std::fmt::Debug for dyn Indexer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Indexer")
.field("name", &self.name())
.finish()
}
}

View File

@ -6,6 +6,7 @@ use crate::{indexer::{ColumnSpec, Ctx, Indexer, TableSpec}, util::hash_str};
use chrono::prelude::*;
use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type};
use rusqlite::OpenFlags;
use tracing::instrument;
#[derive(Serialize, Deserialize)]
struct Config {
@ -47,7 +48,7 @@ CREATE TABLE shell_history (
ColumnSpec {
name: "command",
fts: true,
fts_short: false,
fts_short: true,
trigram: false,
is_array: false
},
@ -103,6 +104,7 @@ impl AtuinIndexer {
}))
}
#[instrument(skip(self, target))]
fn read_database(&self, target: tokio::sync::mpsc::Sender<(String, i64, i64, i32, String, String, String, String)>, min_timestamp: i64) -> Result<()> {
let conn = rusqlite::Connection::open_with_flags(&self.config.db_path, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
let mut fetch_stmt = conn.prepare("SELECT id, timestamp, duration, exit, command, cwd, session, hostname FROM history WHERE timestamp > ? ORDER BY timestamp ASC")?;

View File

@ -13,6 +13,7 @@ use crate::util::{hash_str, parse_html, parse_date, systemtime_to_utc, CONFIG};
use crate::indexer::{delete_nonexistent_files, Ctx, Indexer, TableSpec, ColumnSpec};
use chrono::prelude::*;
use std::str::FromStr;
use tracing::instrument;
#[derive(Serialize, Deserialize, Clone)]
struct Config {
@ -35,6 +36,7 @@ pub struct EpubParse {
tags: Vec<String>
}
#[instrument]
pub fn parse_epub(path: &PathBuf) -> Result<EpubParse> {
let mut epub = EpubDoc::new(path).context("initial doc load")?;
@ -58,6 +60,8 @@ pub fn parse_epub(path: &PathBuf) -> Result<EpubParse> {
tags: epub.metadata.get("subject").cloned().unwrap_or_else(Vec::new)
};
tracing::trace!("read epub {:?}", path);
let mut seen = HashSet::new();
for navpoint in epub.toc.clone() {
let path = navpoint.content;
@ -71,16 +75,18 @@ pub fn parse_epub(path: &PathBuf) -> Result<EpubParse> {
seen.insert(last.to_string());
let resource = epub.get_resource_str_by_path(last).with_context(|| format!("resource {} nonexistent", last))?;
let html = parse_html(resource.as_bytes(), true);
tracing::trace!("read epub chapter {:?}", navpoint.label);
parse.chapters.push((
html.0,
navpoint.label.clone()
))
));
}
}
Ok(parse)
}
#[instrument(skip(ctx, existing_files))]
async fn handle_epub(relpath: CompactString, ctx: Arc<Ctx>, entry: DirEntry, existing_files: Arc<RwLock<HashSet<CompactString>>>) -> Result<()> {
let epub = entry.path();
@ -95,7 +101,7 @@ async fn handle_epub(relpath: CompactString, ctx: Arc<Ctx>, entry: DirEntry, exi
let parse_result = match tokio::task::spawn_blocking(move || parse_epub(&epub)).await? {
Ok(x) => x,
Err(e) => {
log::warn!("Failed parse for {}: {}", relpath, e);
tracing::warn!("Failed parse for {}: {}", relpath, e);
return Ok(())
}
};

View File

@ -5,8 +5,9 @@ use serde::{Deserialize, Serialize};
use crate::{indexer::{ColumnSpec, Ctx, Indexer, TableSpec}, util::hash_str};
use chrono::prelude::*;
use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type};
use rusqlite::OpenFlags;
use rusqlite::{types::ValueRef, OpenFlags};
use tokio::sync::mpsc::Sender;
use tracing::instrument;
#[derive(Serialize, Deserialize)]
struct Config {
@ -63,14 +64,14 @@ CREATE TABLE history (
ColumnSpec {
name: "title",
fts: true,
fts_short: false,
fts_short: true,
trigram: true,
is_array: false
},
ColumnSpec {
name: "description",
fts: true,
fts_short: false,
fts_short: true,
trigram: false,
is_array: false
}
@ -162,12 +163,19 @@ impl FirefoxHistoryDumpIndexer {
}))
}
#[instrument(skip(places, bookmarks, history, self))]
fn read_database(&self, places: Sender<(String, String, Option<String>, i64, Option<i64>, Option<String>, Option<String>)>, places_min_ts: i64, bookmarks: Sender<(String, String, String, i64, i64)>, bookmarks_min_ts: i64, history: Sender<(String, String, i64, i32)>, history_min_ts: i64) -> Result<()> {
let conn = rusqlite::Connection::open_with_flags(&self.config.db_path, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
// Apparently some records in my history database have no last_visit_date. I don't know what happened to it or why, but this is not fixable, so add an ugly hack for them.
let mut fetch_places = conn.prepare(if self.config.include_untimestamped_records { "SELECT guid, url, title, visit_count, last_visit_date, description, preview_image_url FROM places WHERE last_visit_date > ? OR last_visit_date IS NULL ORDER BY last_visit_date ASC" } else { "SELECT guid, url, title, visit_count, last_visit_date, description, preview_image_url FROM places WHERE last_visit_date > ? OR last_visit_date IS NULL ORDER BY last_visit_date ASC" })?;
for row in fetch_places.query_map([places_min_ts], |row| {
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?, row.get(5)?, row.get(6)?))
let description = row.get_ref(5)?;
let description = match description {
ValueRef::Null => None,
ValueRef::Text(x) => Some(String::from_utf8_lossy(x).to_string()),
_ => panic!("unexpected type")
};
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?, description, row.get(6)?))
})? {
let row = row?;
places.blocking_send(row)?;

View File

@ -11,9 +11,10 @@ use tokio::process::Command;
use tokio::sync::RwLock;
use crate::util::{hash_str, parse_date, parse_html, systemtime_to_utc, urlencode, CONFIG};
use crate::indexer::{Ctx, Indexer, TableSpec, delete_nonexistent_files, ColumnSpec};
use async_walkdir::WalkDir;
use async_walkdir::{Filtering, WalkDir};
use chrono::prelude::*;
use regex::{RegexSet, Regex};
use tracing::instrument;
#[derive(Serialize, Deserialize, Debug)]
struct Config {
@ -89,6 +90,7 @@ struct Stream {
#[derive(Deserialize, Debug)]
struct Format {
duration: String,
#[serde(default)]
tags: HashMap<String, String>
}
@ -129,6 +131,7 @@ enum SRTParseState {
ExpectData
}
#[instrument(skip(srt))]
fn parse_srt(srt: &str) -> Result<String> {
use SRTParseState::*;
@ -208,7 +211,7 @@ mod test {
let mut s = String::new();
format_duration(parse_duration("00:01:02,410").unwrap(), &mut s);
assert_eq!(s, "00:01:02.410");
assert_eq!(s, "00:01:02");
}
}
@ -253,7 +256,9 @@ fn score_subtitle_stream(stream: &Stream, others: &Vec<Stream>, parse_so_far: &M
1
}
#[instrument]
async fn parse_media(path: &PathBuf, ignore: Arc<RegexSet>) -> Result<MediaParse> {
tracing::trace!("starting ffprobe");
let ffmpeg = Command::new("ffprobe")
.arg("-hide_banner")
.arg("-print_format").arg("json")
@ -266,6 +271,7 @@ async fn parse_media(path: &PathBuf, ignore: Arc<RegexSet>) -> Result<MediaParse
if !ffmpeg.status.success() {
return Err(anyhow!("ffmpeg failure: {}", String::from_utf8_lossy(&ffmpeg.stderr)))
}
tracing::trace!("ffprobe successful");
let probe: Probe = serde_json::from_slice(&ffmpeg.stdout).context("ffmpeg parse")?;
let mut result = MediaParse {
@ -336,6 +342,8 @@ async fn parse_media(path: &PathBuf, ignore: Arc<RegexSet>) -> Result<MediaParse
// if we have any remotely acceptable subtitles, use ffmpeg to read them out
if best_subtitle_track.1 > i8::MIN {
tracing::trace!("reading subtitle track {:?}", best_subtitle_track);
let ffmpeg = Command::new("ffmpeg")
.arg("-hide_banner").arg("-v").arg("quiet")
.arg("-i").arg(path)
@ -463,13 +471,30 @@ CREATE TABLE media_files (
async fn run(&self, ctx: Arc<Ctx>) -> Result<()> {
let entries = WalkDir::new(&self.config.path);
let ignore = &self.ignore_files;
let base_path = &self.config.path;
let ignore = Arc::new(self.ignore_files.clone());
let base_path = Arc::new(self.config.path.clone());
let base_path_ = base_path.clone();
let ignore_metadata = self.ignore_metadata.clone();
let existing_files = Arc::new(RwLock::new(HashSet::new()));
let existing_files_ = existing_files.clone();
let ctx_ = ctx.clone();
entries
.filter(move |entry| {
let ignore = ignore.clone();
let base_path = base_path.clone();
let path = entry.path();
tracing::trace!("filtering {:?}", path);
if let Some(path) = path.strip_prefix(&*base_path).ok().and_then(|x| x.to_str()) {
if ignore.is_match(path) {
return std::future::ready(Filtering::IgnoreDir);
}
} else {
return std::future::ready(Filtering::IgnoreDir);
}
std::future::ready(Filtering::Continue)
})
.map_err(|e| anyhow::Error::from(e))
.filter(|r| {
// ignore permissions errors because things apparently break otherwise
@ -479,18 +504,20 @@ CREATE TABLE media_files (
};
async move { keep }
})
.try_for_each_concurrent(Some(CONFIG.concurrency), |entry| {
let ctx = ctx.clone();
let existing_files = existing_files.clone();
.try_for_each_concurrent(Some(CONFIG.concurrency), move |entry| {
tracing::trace!("got file {:?}", entry.path());
let ctx = ctx_.clone();
let base_path = base_path_.clone();
let existing_files = existing_files_.clone();
let ignore_metadata = ignore_metadata.clone();
async move {
let real_path = entry.path();
let path = if let Some(path) = real_path.strip_prefix(base_path)?.to_str() {
let path = if let Some(path) = real_path.strip_prefix(&*base_path)?.to_str() {
path
} else {
return Result::Ok(());
};
if ignore.is_match(path) || !entry.file_type().await?.is_file() {
if !entry.file_type().await?.is_file() {
return Ok(());
}
let mut conn = ctx.pool.get().await?;
@ -499,6 +526,7 @@ CREATE TABLE media_files (
let row = conn.query_opt("SELECT timestamp FROM media_files WHERE id = $1", &[&hash_str(path)]).await?;
let timestamp: DateTime<Utc> = row.map(|r| r.get(0)).unwrap_or(DateTime::<Utc>::MIN_UTC);
let modtime = systemtime_to_utc(metadata.modified()?)?;
tracing::trace!("timestamp {:?}", timestamp);
if modtime > timestamp {
match parse_media(&real_path, ignore_metadata).await {
Ok(x) => {
@ -526,9 +554,11 @@ CREATE TABLE media_files (
tx.commit().await?;
},
Err(e) => {
log::warn!("Media parse {}: {:?}", &path, e)
tracing::warn!("Media parse {}: {:?}", &path, e)
}
}
} else {
tracing::trace!("skipping {:?}", path);
}
Result::Ok(())
}

View File

@ -6,6 +6,7 @@ use crate::util::hash_str;
use crate::indexer::{Ctx, Indexer, TableSpec, ColumnSpec};
use chrono::prelude::*;
use rusqlite::OpenFlags;
use tracing::instrument;
#[derive(Serialize, Deserialize)]
struct Config {
@ -257,57 +258,7 @@ CREATE TABLE IF NOT EXISTS mino_files (
}
while let Some((id, object)) = rx.recv().await {
match object {
minoteaur_types::Object::Page(page) => {
// If we already have the latest information on this page, skip it.
if let Some((updated_timestamp, _last_view_timestamp)) = timestamps.get(&id) {
if *updated_timestamp >= page.updated {
continue;
}
}
let ulid = id.to_string();
let int_id = hash_str(&ulid);
let tx = conn.transaction().await?;
tx.execute("DELETE FROM mino_pages WHERE id = $1", &[&int_id]).await?;
tx.execute("INSERT INTO mino_pages VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 0, 0, $10)",
&[&int_id, &ulid, &page.updated, &page.created, &page.title, &(page.size.words as i32), &page.tags.into_iter().collect::<Vec<String>>(), &page.names.into_iter().collect::<Vec<String>>(), &page.content, &page.created])
.await?;
for (key, value) in page.structured_data {
let (num, text) = match value {
minoteaur_types::Value::Number(x) => (Some(x), None),
minoteaur_types::Value::Text(t) => (None, Some(t))
};
tx.execute("INSERT INTO mino_structured_data (page, key, numeric_value, text_value) VALUES ($1, $2, $3, $4)", &[&int_id, &key, &num, &text]).await?;
}
for (_key, file) in page.files {
tx.execute("INSERT INTO mino_files (page, filename, size, timestamp, metadata) VALUES ($1, $2, $3, $4, $5)", &[&int_id, &file.filename, &(file.size as i32), &file.created, &tokio_postgres::types::Json(file.metadata)]).await?;
}
tx.commit().await?;
},
// These should only occur after the page's record, with the exception of page creation.
minoteaur_types::Object::PageView(view) => {
if let Some((_updated_timestamp, last_view_timestamp)) = timestamps.get(&view.page) {
if *last_view_timestamp >= view.time {
continue;
}
}
let int_id = hash_str(&view.page.to_string());
conn.execute("UPDATE mino_pages SET view_count = view_count + 1, last_view_timestamp = $2 WHERE id = $1", &[&int_id, &view.time]).await?;
},
minoteaur_types::Object::Revision(rev) => {
// There's no separate "last revision timestamp" because revisions should always be associated with the updated field being adjusted.
if let Some((updated_timestamp, _last_view_timestamp)) = timestamps.get(&rev.page) {
if *updated_timestamp >= rev.time {
continue;
}
}
if let minoteaur_types::RevisionType::PageCreated = rev.ty {
continue;
}
let int_id = hash_str(&rev.page.to_string());
conn.execute("UPDATE mino_pages SET revision_count = revision_count + 1 WHERE id = $1", &[&int_id]).await?;
}
}
MinoteaurIndexer::write_object(&mut conn, id, object, &timestamps).await?;
}
// Minoteaur doesn't have a delete button so not supporting deletes is clearly fine, probably.
@ -345,4 +296,60 @@ impl MinoteaurIndexer {
}
Ok(())
}
#[instrument(skip(conn, timestamps))]
async fn write_object(conn: &mut tokio_postgres::Client, id: ulid::Ulid, object: minoteaur_types::Object, timestamps: &HashMap<ulid::Ulid, (DateTime<Utc>, DateTime<Utc>)>) -> Result<()> {
match object {
minoteaur_types::Object::Page(page) => {
// If we already have the latest information on this page, skip it.
if let Some((updated_timestamp, _last_view_timestamp)) = timestamps.get(&id) {
if *updated_timestamp >= page.updated {
return Ok(());
}
}
let ulid = id.to_string();
let int_id = hash_str(&ulid);
let tx = conn.transaction().await?;
tx.execute("DELETE FROM mino_pages WHERE id = $1", &[&int_id]).await?;
tx.execute("INSERT INTO mino_pages VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 0, 0, $10)",
&[&int_id, &ulid, &page.updated, &page.created, &page.title, &(page.size.words as i32), &page.tags.into_iter().collect::<Vec<String>>(), &page.names.into_iter().collect::<Vec<String>>(), &page.content, &page.created])
.await?;
for (key, value) in page.structured_data {
let (num, text) = match value {
minoteaur_types::Value::Number(x) => (Some(x), None),
minoteaur_types::Value::Text(t) => (None, Some(t))
};
tx.execute("INSERT INTO mino_structured_data (page, key, numeric_value, text_value) VALUES ($1, $2, $3, $4)", &[&int_id, &key, &num, &text]).await?;
}
for (_key, file) in page.files {
tx.execute("INSERT INTO mino_files (page, filename, size, timestamp, metadata) VALUES ($1, $2, $3, $4, $5)", &[&int_id, &file.filename, &(file.size as i32), &file.created, &tokio_postgres::types::Json(file.metadata)]).await?;
}
tx.commit().await?;
},
// These should only occur after the page's record, with the exception of page creation.
minoteaur_types::Object::PageView(view) => {
if let Some((_updated_timestamp, last_view_timestamp)) = timestamps.get(&view.page) {
if *last_view_timestamp >= view.time {
return Ok(());
}
}
let int_id = hash_str(&view.page.to_string());
conn.execute("UPDATE mino_pages SET view_count = view_count + 1, last_view_timestamp = $2 WHERE id = $1", &[&int_id, &view.time]).await?;
},
minoteaur_types::Object::Revision(rev) => {
// There's no separate "last revision timestamp" because revisions should always be associated with the updated field being adjusted.
if let Some((updated_timestamp, _last_view_timestamp)) = timestamps.get(&rev.page) {
if *updated_timestamp >= rev.time {
return Ok(());
}
}
if let minoteaur_types::RevisionType::PageCreated = rev.ty {
return Ok(());
}
let int_id = hash_str(&rev.page.to_string());
conn.execute("UPDATE mino_pages SET revision_count = revision_count + 1 WHERE id = $1", &[&int_id]).await?;
}
}
Ok(())
}
}

View File

@ -2,9 +2,11 @@ use std::{collections::HashMap, path::PathBuf, str::FromStr, sync::Arc};
use anyhow::{Context, Result};
use futures::pin_mut;
use serde::{Deserialize, Serialize};
use crate::{indexer::{ColumnSpec, Ctx, Indexer, TableSpec}, util::{hash_str, parse_html, systemtime_to_utc}};
use crate::{indexer::{ColumnSpec, Ctx, Indexer, TableSpec}, util::{parse_html, systemtime_to_utc}};
use chrono::prelude::*;
use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type};
use tracing::instrument;
use std::pin::Pin;
#[derive(Serialize, Deserialize)]
struct Config {
@ -94,50 +96,7 @@ CREATE TABLE webpages (
pin_mut!(writer);
while let Some(entry) = readdir.next_entry().await? {
if entry.file_type().await?.is_file() {
let path = entry.path();
let name = path.file_name().unwrap().to_str().context("invalid path")?;
// for some reason there are several filename formats in storage
let mut metadata = None;
if name.starts_with("firefox-recoll-web") {
let meta_file = "_".to_owned() + name;
let mtime = systemtime_to_utc(entry.metadata().await?.modified()?)?;
if let Some(url) = tokio::fs::read_to_string(input.join(&meta_file)).await?.lines().next() {
metadata = Some((mtime, url.to_string()));
tokio::fs::rename(input.join(&meta_file), output.join(&meta_file)).await?;
} else {
log::warn!("Metadata error for {}", name);
}
}
if let Some(rem) = name.strip_suffix(".html") {
let meta_file = format!("{}.dic", rem);
let meta = parse_circache_meta(&tokio::fs::read_to_string(input.join(&meta_file)).await?).context("invalid metadata format")?;
let mtime = i64::from_str(meta.get("fmtime").context("invalid metadata format")?)?;
metadata = Some((DateTime::from_timestamp(mtime, 0).context("time is broken")?, meta.get("url").context("invalid metadata format")?.to_string()));
move_file(&input.join(&meta_file), output.join(&meta_file)).await?;
}
if let Some(rem) = name.strip_prefix("recoll-we-c") {
let meta_file = format!("recoll-we-m{}", rem);
let mtime = systemtime_to_utc(entry.metadata().await?.modified()?)?;
if tokio::fs::try_exists(input.join(&meta_file)).await? {
if let Some(url) = tokio::fs::read_to_string(input.join(&meta_file)).await?.lines().next() {
metadata = Some((mtime, url.to_string()));
move_file(&input.join(&meta_file), output.join(&meta_file)).await?;
} else {
log::warn!("Metadata error for {}", name);
}
} else {
log::warn!("No metadata for {}", name);
}
}
if let Some((mtime, url)) = metadata {
let html = tokio::fs::read(&path).await?;
move_file(&path, output.join(name)).await?;
let (content, title) = parse_html(html.as_slice(), true);
writer.as_mut().write(&[&mtime, &url, &title.replace("\0", ""), &content.replace("\0", ""), &String::from_utf8_lossy(&html).replace("\0", "")]).await?;
}
}
RclweIndexer::process_entry(entry, &input, &output, &mut writer).await?;
}
writer.finish().await?;
@ -157,4 +116,57 @@ impl RclweIndexer {
config: Arc::new(config)
}))
}
#[instrument(skip(writer))]
async fn process_entry(entry: tokio::fs::DirEntry, input: &PathBuf, output: &PathBuf, writer: &mut Pin<&mut BinaryCopyInWriter>) -> Result<()> {
if entry.file_type().await?.is_file() {
let path = entry.path();
let name = path.file_name().unwrap().to_str().context("invalid path")?;
// for some reason there are several filename formats in storage
let mut metadata = None;
if name.starts_with("firefox-recoll-web") {
tracing::trace!("reading firefox-recoll-web");
let meta_file = "_".to_owned() + name;
let mtime = systemtime_to_utc(entry.metadata().await?.modified()?)?;
if let Some(url) = tokio::fs::read_to_string(input.join(&meta_file)).await?.lines().next() {
metadata = Some((mtime, url.to_string()));
tokio::fs::rename(input.join(&meta_file), output.join(&meta_file)).await?;
} else {
tracing::warn!("Metadata error for {}", name);
}
}
if let Some(rem) = name.strip_suffix(".html") {
tracing::trace!("reading circache html");
let meta_file = format!("{}.dic", rem);
let meta = parse_circache_meta(&tokio::fs::read_to_string(input.join(&meta_file)).await?).context("invalid metadata format")?;
let mtime = i64::from_str(meta.get("fmtime").context("invalid metadata format")?)?;
metadata = Some((DateTime::from_timestamp(mtime, 0).context("time is broken")?, meta.get("url").context("invalid metadata format")?.to_string()));
move_file(&input.join(&meta_file), output.join(&meta_file)).await?;
}
if let Some(rem) = name.strip_prefix("recoll-we-c") {
tracing::trace!("reading recoll-we-c");
let meta_file = format!("recoll-we-m{}", rem);
let mtime = systemtime_to_utc(entry.metadata().await?.modified()?)?;
if tokio::fs::try_exists(input.join(&meta_file)).await? {
if let Some(url) = tokio::fs::read_to_string(input.join(&meta_file)).await?.lines().next() {
metadata = Some((mtime, url.to_string()));
move_file(&input.join(&meta_file), output.join(&meta_file)).await?;
} else {
tracing::warn!("Metadata error for {}", name);
}
} else {
tracing::warn!("No metadata for {}", name);
}
}
if let Some((mtime, url)) = metadata {
let html = tokio::fs::read(&path).await?;
move_file(&path, output.join(name)).await?;
let (content, title) = parse_html(html.as_slice(), true);
writer.as_mut().write(&[&mtime, &url, &title.replace("\0", ""), &content.replace("\0", ""), &String::from_utf8_lossy(&html).replace("\0", "")]).await?;
}
}
Ok(())
}
}

View File

@ -6,13 +6,14 @@ use compact_str::CompactString;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use tracing::instrument;
use crate::util::{hash_str, parse_html, parse_pdf, systemtime_to_utc, urlencode, CONFIG};
use crate::indexer::{Ctx, Indexer, TableSpec, delete_nonexistent_files, ColumnSpec};
use async_walkdir::WalkDir;
use chrono::prelude::*;
use regex::RegexSet;
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
struct Config {
path: String,
#[serde(default)]
@ -20,6 +21,7 @@ struct Config {
base_url: String
}
#[derive(Debug)]
pub struct TextFilesIndexer {
config: Config,
ignore: RegexSet
@ -93,54 +95,7 @@ CREATE TABLE text_files (
{
let ctx = ctx.clone();
let existing_files = existing_files.clone();
async move {
let real_path = entry.path();
let path = if let Some(path) = real_path.strip_prefix(base_path)?.to_str() {
path
} else {
return Result::Ok(());
};
let ext = real_path.extension().and_then(OsStr::to_str);
if ignore.is_match(path) || !entry.file_type().await?.is_file() || !VALID_EXTENSIONS.contains(ext.unwrap_or_default()) {
return Ok(());
}
let mut conn = ctx.pool.get().await?;
existing_files.write().await.insert(CompactString::from(path));
let metadata = entry.metadata().await?;
let row = conn.query_opt("SELECT timestamp FROM text_files WHERE id = $1", &[&hash_str(path)]).await?;
let timestamp: DateTime<Utc> = row.map(|r| r.get(0)).unwrap_or(DateTime::<Utc>::MIN_UTC);
let modtime = systemtime_to_utc(metadata.modified()?)?;
if modtime > timestamp {
let parse = match ext {
Some("pdf") => {
parse_pdf(&real_path).await.map(Some)
},
Some("txt") => {
let content = tokio::fs::read(&real_path).await?;
Ok(Some((String::from_utf8_lossy(&content).to_string(), String::new())))
},
Some("htm") | Some("html") | Some("xhtml") => {
let content = tokio::fs::read(&real_path).await?;
Ok(Some(tokio::task::block_in_place(|| parse_html(&content, true))))
},
_ => Ok(None),
};
match parse {
Ok(None) => (),
Ok(Some((content, title))) => {
// Null bytes aren't legal in Postgres strings despite being valid UTF-8.
let tx = conn.transaction().await?;
tx.execute("DELETE FROM text_files WHERE id = $1", &[&hash_str(path)]).await?;
tx.execute("INSERT INTO text_files VALUES ($1, $2, $3, $4, $5)",
&[&hash_str(path), &path, &title.replace("\0", ""), &content.replace("\0", ""), &modtime])
.await?;
tx.commit().await?;
},
Err(e) => log::warn!("File parse for {}: {}", path, e)
}
}
Result::Ok(())
}
TextFilesIndexer::process_file(entry, ctx, ignore, existing_files, base_path)
}).await?;
{
@ -164,4 +119,59 @@ impl TextFilesIndexer {
config
}))
}
#[instrument(skip(ctx, ignore, existing_files, base_path))]
async fn process_file(entry: async_walkdir::DirEntry, ctx: Arc<Ctx>, ignore: &RegexSet, existing_files: Arc<RwLock<HashSet<CompactString>>>, base_path: &String) -> Result<()> {
let real_path = entry.path();
let path = if let Some(path) = real_path.strip_prefix(base_path)?.to_str() {
path
} else {
return Result::Ok(());
};
let ext = real_path.extension().and_then(OsStr::to_str);
if ignore.is_match(path) || !entry.file_type().await?.is_file() || !VALID_EXTENSIONS.contains(ext.unwrap_or_default()) {
return Ok(());
}
let mut conn = ctx.pool.get().await?;
existing_files.write().await.insert(CompactString::from(path));
let metadata = entry.metadata().await?;
let row = conn.query_opt("SELECT timestamp FROM text_files WHERE id = $1", &[&hash_str(path)]).await?;
let timestamp: DateTime<Utc> = row.map(|r| r.get(0)).unwrap_or(DateTime::<Utc>::MIN_UTC);
let modtime = systemtime_to_utc(metadata.modified()?)?;
if modtime > timestamp {
let parse = TextFilesIndexer::read_file(&real_path, ext).await;
match parse {
Ok(None) => (),
Ok(Some((content, title))) => {
// Null bytes aren't legal in Postgres strings despite being valid UTF-8.
let tx = conn.transaction().await?;
tx.execute("DELETE FROM text_files WHERE id = $1", &[&hash_str(path)]).await?;
tx.execute("INSERT INTO text_files VALUES ($1, $2, $3, $4, $5)",
&[&hash_str(path), &path, &title.replace("\0", ""), &content.replace("\0", ""), &modtime])
.await?;
tx.commit().await?;
},
Err(e) => tracing::warn!("File parse for {}: {}", path, e)
}
}
Ok(())
}
#[instrument]
async fn read_file(path: &std::path::PathBuf, ext: Option<&str>) -> Result<Option<(String, String)>> {
match ext {
Some("pdf") => {
parse_pdf(&path).await.map(Some)
},
Some("txt") => {
let content = tokio::fs::read(&path).await?;
Ok(Some((String::from_utf8_lossy(&content).to_string(), String::new())))
},
Some("htm") | Some("html") | Some("xhtml") => {
let content = tokio::fs::read(&path).await?;
Ok(Some(tokio::task::block_in_place(|| parse_html(&content, true))))
},
_ => Ok(None),
}
}
}

View File

@ -12,6 +12,8 @@ use chrono::prelude::*;
use tokio::{fs::File, io::{AsyncBufReadExt, BufReader}};
use mail_parser::MessageParser;
use std::future::Future;
use compact_str::CompactString;
use tracing::instrument;
#[derive(Serialize, Deserialize)]
struct Config {
@ -47,6 +49,7 @@ lazy_static::lazy_static! {
static ref MULTIPART_REGEX: regex::bytes::Regex = regex::bytes::RegexBuilder::new(r#"content-type:\s*multipart/(mixed|alternative);\s*boundary="?([A-Za-z0-9=_-]+)"?;?\r\n$"#).case_insensitive(true).build().unwrap();
}
#[instrument(skip(callback))]
async fn read_mbox<U: Future<Output=Result<()>>, F: FnMut(Email) -> U>(path: &PathBuf, mut callback: F) -> Result<()> {
let input = File::open(path).await?;
let mut buf = Vec::new();
@ -153,7 +156,7 @@ CREATE TABLE IF NOT EXISTS emails (
ColumnSpec {
name: "subject",
fts: true,
fts_short: false,
fts_short: true,
trigram: true,
is_array: false
},
@ -179,42 +182,14 @@ CREATE TABLE IF NOT EXISTS emails (
while let Some(entry) = entries.try_next().await? {
let path = entry.path();
let mbox = path.file_stem().and_then(|x| x.to_str()).context("invalid path")?.to_compact_string();
let folder = path.parent().unwrap().file_name().unwrap().to_str().unwrap();
if let Some(account) = config.account_mapping.get(folder) {
let folder = path.parent().unwrap().file_name().unwrap().to_str().unwrap().to_compact_string();
if let Some(account) = config.account_mapping.get(&*folder) {
let account = account.to_compact_string();
let ext = path.extension();
if let None = ext {
if !self.config.ignore_mboxes.contains(mbox.as_str()) {
let ctx = ctx.clone();
js.spawn(async move {
read_mbox(&path, move |mail| {
let ctx = ctx.clone();
let mbox = mbox.trim_end_matches("-1").to_compact_string();
let account = account.clone();
async move {
let conn = ctx.pool.get().await?;
let id = hash_thing(&mail.raw.as_slice());
let body = match mail.body {
Body::Plain(t) => t,
Body::Html(h) => parse_html(h.as_bytes(), false).0
};
conn.execute(r#"INSERT INTO emails VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (id) DO UPDATE SET box = $7"#, &[
&id,
&mail.message_id,
&mail.reply_to,
&mail.date,
&mail.raw,
&account.as_str(),
&mbox.as_str(),
&mail.from,
&mail.from_address,
&mail.subject,
&body
]).await?;
Ok(())
}
}).await
});
js.spawn(EmailIndexer::process_mbox(ctx.clone(), path.clone(), mbox, folder, account.clone()));
}
}
}
@ -239,4 +214,36 @@ impl EmailIndexer {
config: Arc::new(config)
}))
}
#[instrument(skip(ctx))]
async fn process_mbox(ctx: Arc<Ctx>, path: PathBuf, mbox: CompactString, folder: CompactString, account: CompactString) -> Result<()> {
tracing::trace!("reading mailbox");
read_mbox(&path, move |mail| {
let ctx = ctx.clone();
let account = account.clone();
let mbox = mbox.trim_end_matches("-1").to_compact_string();
async move {
let conn = ctx.pool.get().await?;
let id = hash_thing(&mail.raw.as_slice());
let body = match mail.body {
Body::Plain(t) => t,
Body::Html(h) => parse_html(h.as_bytes(), false).0
};
conn.execute(r#"INSERT INTO emails VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (id) DO UPDATE SET box = $7"#, &[
&id,
&mail.message_id,
&mail.reply_to,
&mail.date,
&mail.raw,
&account.as_str(),
&mbox.as_str(),
&mail.from,
&mail.from_address,
&mail.subject,
&body
]).await?;
Ok(())
}
}).await
}
}

View File

@ -1,13 +1,12 @@
use chrono::{DateTime, Utc};
use compact_str::{CompactString, ToCompactString};
use deadpool_postgres::{Manager, ManagerConfig, RecyclingMethod, Pool};
use futures::{StreamExt, TryFutureExt};
use indexer::{ColumnSpec, TableSpec};
use pgvector::HalfVector;
use semantic::SemanticCtx;
use tokio::signal::unix::{signal, SignalKind};
use tokio_postgres::{NoTls, Row};
use anyhow::{Context, Result};
use tracing::Instrument;
use util::{get_column_string, urlencode};
use std::collections::{BTreeMap, HashMap};
use std::{str::FromStr, sync::Arc, fmt::Write};
@ -16,6 +15,7 @@ use maud::{html, Markup, Render, DOCTYPE};
use serde::{Deserialize, Serialize};
use rs_abbreviation_number::NumericAbbreviate;
use sea_query_postgres::PostgresBinder;
use tracing_subscriber::prelude::*;
mod indexer;
mod indexers;
@ -771,7 +771,9 @@ impl ServerState {
#[tokio::main]
async fn main() -> Result<()> {
pretty_env_logger::init();
console_subscriber::ConsoleLayer::builder()
.with_default_env()
.init();
let pg_config = tokio_postgres::Config::from_str(&CONFIG.database)?;
let mgr_config = ManagerConfig { recycling_method: RecyclingMethod::Fast };
@ -779,12 +781,12 @@ async fn main() -> Result<()> {
let pool = Pool::builder(mgr).max_size(20).build()?;
let indexers: Arc<Vec<Box<dyn Indexer>>> = Arc::new(vec![
indexers::mediafiles::MediaFilesIndexer::new(CONFIG.indexers["media_files"].clone()).await?,
indexers::firefox_history_dump::FirefoxHistoryDumpIndexer::new(CONFIG.indexers["browser_history"].clone()).await?,
indexers::rclwe::RclweIndexer::new(CONFIG.indexers["rclwe"].clone()).await?,
indexers::atuin::AtuinIndexer::new(CONFIG.indexers["atuin"].clone()).await?,
indexers::miniflux::MinifluxIndexer::new(CONFIG.indexers["miniflux"].clone()).await?,
indexers::thunderbird_email::EmailIndexer::new(CONFIG.indexers["email"].clone()).await?,
indexers::mediafiles::MediaFilesIndexer::new(CONFIG.indexers["media_files"].clone()).await?,
indexers::books::BooksIndexer::new(CONFIG.indexers["books"].clone()).await?,
indexers::textfiles::TextFilesIndexer::new(CONFIG.indexers["text_files"].clone()).await?,
indexers::anki::AnkiIndexer::new(CONFIG.indexers["anki"].clone()).await?,
@ -805,7 +807,7 @@ async fn main() -> Result<()> {
for (index, sql) in indexer.schemas().iter().enumerate() {
let index = index as i32;
if index >= version {
log::info!("Migrating {} to {}.", name, index);
tracing::info!("Migrating {} to {}.", name, index);
let tx = conn.transaction().await?;
tx.batch_execute(*sql).await.context("execute migration")?;
tx.execute("INSERT INTO versions VALUES ($1, $2) ON CONFLICT (indexer) DO UPDATE SET version = $2", &[&name, &(index + 1)]).await.context("update migrations database")?;
@ -825,11 +827,10 @@ async fn main() -> Result<()> {
let ctx = Arc::new(indexer::Ctx {
pool: pool.clone()
});
log::info!("Indexing: {}.", indexer.name());
indexer.run(ctx).await.context(indexer.name())?;
log::info!("Building FTS index: {}.", indexer.name());
tracing::info!("indexing {}", indexer.name());
indexer.run(ctx).instrument(tracing::info_span!("index", indexer = indexer.name())).await.context(indexer.name())?;
tracing::info!("FTS indexing {}", indexer.name());
semantic::fts_for_indexer(indexer, sctx.clone()).await?;
log::info!("Done: {}.", indexer.name())
}
Ok(())
},
@ -856,7 +857,46 @@ async fn main() -> Result<()> {
Ok(())
}).context("init fail")
})
},
"sql" => {
println!("delete semantic indices:");
for indexer in indexers.iter() {
for table in indexer.tables() {
for column in table.columns {
if column.fts {
println!("DELETE FROM {}_{}_fts_chunks;", table.name, column.name);
println!("DROP INDEX IF EXISTS {}_{}_fts_chunks_embedding_idx;", table.name, column.name);
}
}
println!("DELETE FROM {}_change_tracker;", table.name);
println!("INSERT INTO {}_change_tracker SELECT id, CURRENT_TIMESTAMP FROM {};", table.name, table.name);
}
}
println!("create semantic indices:");
for indexer in indexers.iter() {
for table in indexer.tables() {
for column in table.columns {
if column.fts {
println!("CREATE INDEX {}_{}_fts_chunks_embedding_idx ON {}_{}_fts_chunks USING hnsw (embedding halfvec_ip_ops);", table.name, column.name, table.name, column.name);
}
}
}
}
println!("create document index:");
for indexer in indexers.iter() {
for table in indexer.tables() {
for column in table.columns {
if column.fts {
println!("CREATE INDEX {}_{}_fts_chunks_document_idx ON {}_{}_fts_chunks (document);", table.name, column.name, table.name, column.name);
}
}
}
}
Ok(())
},
_ => Ok(())
}
}

File diff suppressed because one or more lines are too long

View File

@ -6,6 +6,7 @@ use seahash::SeaHasher;
use serde::{Serialize, Deserialize};
use tokio_postgres::Row;
use percent_encoding::{utf8_percent_encode, AsciiSet, CONTROLS};
use tracing::instrument;
const FRAGMENT: &AsciiSet = &CONTROLS.add(b' ').add(b'"').add(b'<').add(b'>').add(b'`');
@ -57,6 +58,7 @@ lazy_static::lazy_static! {
static ref SPACE_ON_NEWLINES: Regex = Regex::new(r"\n\s+").unwrap();
}
#[instrument(skip(html))]
pub fn parse_html(html: &[u8], prefer_title_tag: bool) -> (String, String) {
use html5gum::Token;
@ -135,6 +137,7 @@ pub fn parse_html(html: &[u8], prefer_title_tag: bool) -> (String, String) {
(NEWLINES.replace_all(&SPACE_ON_NEWLINES.replace_all(&text, "\n"), "\n\n").trim().to_string(), title)
}
#[instrument]
pub async fn parse_pdf(path: &PathBuf) -> Result<(String, String)> {
// Rust does not seem to have a robust library for this.
let res = tokio::process::Command::new("pdftotext")