1
0
mirror of https://github.com/osmarks/maghammer.git synced 2025-01-02 20:10:27 +00:00

initial release version

This commit is contained in:
osmarks 2024-07-18 16:28:08 +01:00
commit ab2c6fdea2
23 changed files with 7684 additions and 0 deletions

7
LICENSE Normal file
View File

@ -0,0 +1,7 @@
Copyright 2024 osmarks
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

3627
idiosyncratic-scimitar/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,45 @@
[package]
name = "idiosyncratic-scimitar"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4", "with-serde_json-1"] }
tokio = { version = "1", features = ["full"]}
chrono = { version = "0.4", features = ["serde"] }
anyhow = "1"
async-walkdir = "2"
lazy_static = "1"
compact_str = { version = "0.8.0-beta", features = ["serde"] }
seahash = "4"
toml = "0.8"
serde = { version = "1", features = ["derive"] }
reqwest = "0.12"
deadpool-postgres = "0.14"
log = "0.4"
pretty_env_logger = "0.5"
pgvector = { version = "0.3", features = ["postgres", "halfvec"] }
tokenizers = { version = "0.19", features = ["http"] }
regex = "1"
futures = "0.3"
html5gum = "0.5"
async-trait = "0.1"
num_cpus = "1"
epub = "2"
tokio-stream = { version = "0.1", features = ["fs"] }
rusqlite = "0.31"
serde_json = "1"
rmp-serde = "1"
serde_bytes = "0.11"
half = "2"
ntex = { version = "2", features = ["tokio"] }
maud = "0.26"
percent-encoding = "2"
rs-abbreviation-number = "0.3"
ntex-files = "2"
derive_more = "0.99"
im = { version = "15", features = ["serde"] }
sea-query = { version = "0.30", features = ["backend-postgres", "postgres-array"] }
sea-query-postgres = { version = "0.4", features = ["postgres-array"] }
ulid = { version = "1", features = ["serde"] }
mail-parser = "0.9"

View File

@ -0,0 +1,59 @@
import whisperx
import sys
import time
import sqlite3
import psycopg2
device = "cuda"
batch_size = 16
compute_type = "float16"
model = whisperx.load_model("large-v2", device, compute_type=compute_type, language="en")
model_a, metadata = whisperx.load_align_model(language_code="en", device=device)
print("Models loaded.")
BASE = "/media/"
conn = psycopg2.connect("dbname=maghammer user=maghammer")
conn2 = psycopg2.connect("dbname=maghammer user=maghammer")
csr = conn.cursor()
csr2 = conn.cursor()
csr.execute("SELECT id, path FROM media_files WHERE auto_subs_state = 1") # PENDING
def format_duration(seconds):
hours = int(seconds / 3600.0)
seconds -= 3600.0 * hours
minutes = int(seconds / 60.0)
seconds -= 60.0 * minutes
full_seconds = int(seconds)
return f"{hours:02}:{minutes:02}:{full_seconds:02}"
while row := csr.fetchone():
file = row[1]
docid = row[0]
start = time.time()
skip = False
subs = ""
try:
audio = whisperx.load_audio(BASE + file)
except Exception as e:
print(e)
skip = True
if not skip:
loaded = time.time()
result = model.transcribe(audio, batch_size=batch_size)
transcribed = time.time()
result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False)
aligned = time.time()
print(f"{file} x{len(result["segments"])} load={loaded-start:1f}s transcribe={transcribed - loaded:1f}s align={aligned - transcribed:1f}s")
for seg in result["segments"]:
subs += f"[{format_duration(seg['start'])} -> {format_duration(seg['end'])}]: {seg['text'].strip()}\n"
subs = subs.strip()
csr2.execute("UPDATE media_files SET subs = %s, auto_subs_state = 2 WHERE id = %s", (subs, docid)) # GENERATED
conn2.commit()

View File

@ -0,0 +1,57 @@
database = "host=localhost port=5432 user=maghammer dbname=maghammer"
concurrency = 8
[semantic]
backend = "http://100.64.0.10:1706"
embedding_dim = 1024
tokenizer = "Snowflake/snowflake-arctic-embed-l"
max_tokens = 510
batch_size = 256
[indexers.text_files]
path = "/data/archive"
base_url = "https://media.osmarks.net/Archive/"
[indexers.books]
path = "/data/calibre"
[indexers.media_files]
path = "./mediatest"
ignore_regexes = [
"virgo inbound/.*",
"Large Thing Backups/.*",
"Sam's Laser FAQ/.*",
"Video Docs/.*",
"Thumbnails/.*"
]
ignore_metadata_regexes = [
"GalaxyTV",
"x26[45]"
]
base_url = "https://media.osmarks.net/"
[indexers.anki]
db_path = "/data/anki/collections/osmarks/collection.anki2"
[indexers.minoteaur]
db_path = "/data/minoteaur/minoteaur.sqlite3"
base_url = "https://notes.osmarks.net/"
[indexers.email]
account_mapping = { "imap.purelymail.com" = "personal", "outlook.office365.com" = "university" }
mboxes_path = "/home/osmarks/.thunderbird/3nacdt54.default-release/ImapMail/"
ignore_mboxes = ["Junk", "nstmp"]
[indexers.miniflux]
source_database = "host=localhost dbname=miniflux user=miniflux"
[indexers.atuin]
db_path = "/home/osmarks/.local/share/atuin/history.db"
[indexers.rclwe]
input = "rclwe_dump"
output = "rclwe_dump2"
[indexers.browser_history]
db_path = "/data/archive/lthist.sqlite3"
include_untimestamped_records = false

View File

@ -0,0 +1,118 @@
import torch
import time
import threading
from aiohttp import web
import aiohttp
import asyncio
import traceback
import umsgpack
import gc
import collections
import queue
import io
from sentence_transformers import SentenceTransformer
from prometheus_client import Counter, Histogram, REGISTRY, generate_latest
device = torch.device("cuda:0")
model_name = "./snowflake-arctic-embed-l"
model = SentenceTransformer(model_name).half().to(device)
model.eval()
print("model loaded")
MODELNAME = "sbert-snowflake-arctic-embed-l"
BS = 256
InferenceParameters = collections.namedtuple("InferenceParameters", ["text", "callback"])
items_ctr = Counter("modelserver_total_items", "Items run through model server", ["model", "modality"])
inference_time_hist = Histogram("modelserver_inftime", "Time running inference", ["model", "batch_size"])
batch_count_ctr = Counter("modelserver_batchcount", "Inference batches run", ["model"])
torch.set_grad_enabled(False)
def do_inference(params: InferenceParameters):
with torch.no_grad():
try:
text, callback = params
batch_size = text["input_ids"].shape[0]
assert batch_size <= BS, f"max batch size is {BS}"
items_ctr.labels(MODELNAME, "text").inc(batch_size)
with inference_time_hist.labels(MODELNAME, batch_size).time():
features = model(text)["sentence_embedding"]
features /= features.norm(dim=-1, keepdim=True)
features = features.cpu().numpy()
batch_count_ctr.labels(MODELNAME).inc()
callback(True, features)
except Exception as e:
traceback.print_exc()
callback(False, str(e))
finally:
torch.cuda.empty_cache()
q = queue.Queue(10)
def infer_thread():
while True:
do_inference(q.get())
app = web.Application(client_max_size=2**26)
routes = web.RouteTableDef()
@routes.post("/")
async def run_inference(request):
loop = asyncio.get_event_loop()
data = umsgpack.loads(await request.read())
event = asyncio.Event()
results = None
def callback(*argv):
nonlocal results
results = argv
loop.call_soon_threadsafe(lambda: event.set())
tokenized = model.tokenize(data["text"])
tokenized = { k: v.to(device) for k, v in tokenized.items() }
q.put_nowait(InferenceParameters(tokenized, callback))
await event.wait()
body_data = results[1]
if results[0]:
status = 200
body_data = [x.astype("float16").tobytes() for x in body_data]
else:
status = 500
print(results[1])
return web.Response(body=umsgpack.dumps(body_data), status=status, content_type="application/msgpack")
@routes.get("/config")
async def config(request):
return web.Response(body=umsgpack.dumps({
"model": model_name,
"batch": BS,
"embedding_size": model.get_sentence_embedding_dimension(),
"tokenizer": "bert-based-uncased"
}), status=200, content_type="application/msgpack")
@routes.get("/")
async def health(request):
return web.Response(status=204)
@routes.get("/metrics")
async def metrics(request):
return web.Response(body=generate_latest(REGISTRY))
app.router.add_routes(routes)
async def run_webserver():
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "", 1706)
print("server starting")
await site.start()
try:
th = threading.Thread(target=infer_thread)
th.start()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run_webserver())
loop.run_forever()
except KeyboardInterrupt:
print("quitting")
import sys
sys.exit(0)

View File

@ -0,0 +1,33 @@
use derive_more::Display;
#[derive(Debug, Display)]
pub enum Error {
Internal(anyhow::Error),
PGError(tokio_postgres::Error),
PGPoolError(deadpool_postgres::PoolError),
NotFound
}
impl std::error::Error for Error {}
impl From<anyhow::Error> for Error {
fn from(value: anyhow::Error) -> Self {
Self::Internal(value)
}
}
impl From<tokio_postgres::Error> for Error {
fn from(value: tokio_postgres::Error) -> Self {
Self::PGError(value)
}
}
impl From<deadpool_postgres::PoolError> for Error {
fn from(value: deadpool_postgres::PoolError) -> Self {
Self::PGPoolError(value)
}
}
impl ntex::web::error::WebResponseError for Error {}
pub type EResult<T> = Result<T, Error>;

View File

@ -0,0 +1,75 @@
use std::{collections::HashSet, sync::Arc};
use anyhow::{Context, Result};
use compact_str::CompactString;
use deadpool_postgres::Pool;
use futures::{pin_mut, TryStreamExt};
use crate::util::hash_str;
#[derive(Default)]
pub struct ColumnSpec {
pub name: &'static str,
pub fts: bool,
pub fts_short: bool,
pub trigram: bool,
pub is_array: bool
}
pub struct TableSpec {
pub name: &'static str,
pub parent: Option<(&'static str, &'static str)>,
pub columns: &'static [ColumnSpec],
pub url_source_column: Option<&'static str>,
pub title_source_column: &'static str,
pub summary_columns: &'static [&'static str]
}
impl sea_query::Iden for &TableSpec {
fn unquoted(&self, s: &mut dyn std::fmt::Write) {
write!(s, "{}", self.name).unwrap();
}
}
impl sea_query::Iden for &ColumnSpec {
fn unquoted(&self, s: &mut dyn std::fmt::Write) {
write!(s, "{}", self.name).unwrap();
}
}
impl TableSpec {
pub fn column(&self, name: &str) -> Option<&'static ColumnSpec> {
self.columns.iter().find(|x| x.name == name)
}
}
pub struct Ctx {
pub pool: Pool
}
#[async_trait::async_trait]
pub trait Indexer: Sync + Send {
async fn run(&self, ctx: Arc<Ctx>) -> Result<()>;
fn name(&self) -> &'static str;
fn schemas(&self) -> &'static [&'static str];
fn tables(&self) -> &'static [TableSpec];
fn url_for(&self, table: &str, column_content: &str) -> String;
fn table(&self, name: &str) -> Option<&'static TableSpec> {
self.tables().iter().find(|x| x.name == name)
}
}
pub async fn delete_nonexistent_files(ctx: Arc<Ctx>, select_paths: &str, delete_by_id: &str, existing: &HashSet<CompactString>) -> Result<()> {
let conn = ctx.pool.get().await?;
let it = conn.query_raw(select_paths, [""; 0]).await?;
pin_mut!(it);
while let Some(row) = it.try_next().await? {
let path: String = row.get(0);
let path = CompactString::from(path);
if !existing.contains(&path) {
conn.execute(delete_by_id, &[&hash_str(&path)]).await?;
}
}
Ok(())
}

View File

@ -0,0 +1,121 @@
use std::sync::Arc;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use crate::indexer::{Ctx, Indexer, TableSpec, ColumnSpec};
use chrono::prelude::*;
use rusqlite::OpenFlags;
#[derive(Serialize, Deserialize)]
struct Config {
db_path: String
}
#[derive(Clone)]
pub struct AnkiIndexer {
config: Arc<Config>
}
#[async_trait::async_trait]
impl Indexer for AnkiIndexer {
fn name(&self) -> &'static str {
"anki"
}
fn schemas(&self) -> &'static [&'static str] {
&[r#"
CREATE TABLE cards (
id BIGINT PRIMARY KEY,
front TEXT NOT NULL,
back TEXT NOT NULL,
timestamp TIMESTAMPTZ
);
"#]
}
fn tables(&self) -> &'static [TableSpec] {
&[
TableSpec {
name: "cards",
parent: None,
columns: &[
ColumnSpec {
name: "front",
fts: true,
fts_short: false,
trigram: false,
is_array: false
},
ColumnSpec {
name: "back",
fts: true,
fts_short: false,
trigram: false,
is_array: false
}
],
url_source_column: None,
title_source_column: "front",
summary_columns: &["front", "back", "timestamp"]
}
]
}
async fn run(&self, ctx: Arc<Ctx>) -> Result<()> {
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let self = self.clone();
let bg = tokio::task::spawn_blocking(move || self.read_database(tx));
let mut conn = ctx.pool.get().await?;
while let Some((id, fields, modtime)) = rx.recv().await {
let (front, back): (&str, &str) = fields.split_once('\x1F').context("parse fail")?;
let modtime = DateTime::from_timestamp(modtime, 0).context("parse fail")?;
let row = conn.query_opt("SELECT timestamp FROM cards WHERE id = $1", &[&id]).await?;
if let Some(row) = row {
let last_modtime: DateTime<Utc> = row.get(0);
if last_modtime >= modtime {
continue;
}
}
let tx = conn.transaction().await?;
tx.execute("DELETE FROM cards WHERE id = $1", &[&id]).await?;
tx.execute("INSERT INTO cards VALUES ($1, $2, $3, $4)",
&[&id, &front, &back, &modtime])
.await?;
tx.commit().await?;
}
// TODO at some point: delete removed notes
bg.await??;
Ok(())
}
fn url_for(&self, _table: &str, _column_content: &str) -> String {
unimplemented!()
}
}
impl AnkiIndexer {
pub async fn new(config: toml::Table) -> Result<Box<Self>> {
let config: Config = config.try_into()?;
Ok(Box::new(AnkiIndexer {
config: Arc::new(config)
}))
}
fn read_database(&self, target: tokio::sync::mpsc::Sender<(i64, String, i64)>) -> Result<()> {
let conn = rusqlite::Connection::open_with_flags(&self.config.db_path, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
let mut notes_stmt = conn.prepare("SELECT id, flds, mod FROM notes")?;
for row in notes_stmt.query_map([], |row| {
let id = row.get(0)?;
let fields: String = row.get(1)?;
let modtime: i64 = row.get(2)?;
Ok((id, fields, modtime))
})? {
let row = row?;
target.blocking_send(row)?;
}
Ok(())
}
}

View File

@ -0,0 +1,125 @@
use std::sync::Arc;
use anyhow::Result;
use futures::pin_mut;
use serde::{Deserialize, Serialize};
use crate::{indexer::{ColumnSpec, Ctx, Indexer, TableSpec}, util::hash_str};
use chrono::prelude::*;
use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type};
use rusqlite::OpenFlags;
#[derive(Serialize, Deserialize)]
struct Config {
db_path: String
}
#[derive(Clone)]
pub struct AtuinIndexer {
config: Arc<Config>
}
#[async_trait::async_trait]
impl Indexer for AtuinIndexer {
fn name(&self) -> &'static str {
"atuin"
}
fn schemas(&self) -> &'static [&'static str] {
&[r#"
CREATE TABLE shell_history (
id BIGINT PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
command TEXT NOT NULL,
cwd TEXT,
duration BIGINT,
exit INTEGER,
hostname TEXT NOT NULL,
session TEXT NOT NULL
);
"#]
}
fn tables(&self) -> &'static [TableSpec] {
&[
TableSpec {
name: "shell_history",
parent: None,
columns: &[
ColumnSpec {
name: "command",
fts: true,
fts_short: false,
trigram: false,
is_array: false
},
],
url_source_column: None,
title_source_column: "command",
summary_columns: &["timestamp", "hostname", "command", "cwd", "duration", "exit"]
}
]
}
async fn run(&self, ctx: Arc<Ctx>) -> Result<()> {
let (tx, mut rx) = tokio::sync::mpsc::channel(1000);
let self = self.clone();
let conn = ctx.pool.get().await?;
let max_timestamp = conn.query_one("SELECT max(timestamp) FROM shell_history", &[]).await?.get::<_, Option<DateTime<Utc>>>(0);
let max_timestamp = max_timestamp.map(|x| x.timestamp_nanos_opt().unwrap() + 999).unwrap_or(0); // my code had better not be deployed unchanged in 2262; we have to add 1000 because of Postgres timestamp rounding
let bg = tokio::task::spawn_blocking(move || self.read_database(tx, max_timestamp));
let sink = conn.copy_in("COPY shell_history (id, timestamp, command, cwd, duration, exit, hostname, session) FROM STDIN BINARY").await?;
let writer = BinaryCopyInWriter::new(sink, &[Type::INT8, Type::TIMESTAMPTZ, Type::TEXT, Type::TEXT, Type::INT8, Type::INT4, Type::TEXT, Type::TEXT]);
pin_mut!(writer);
while let Some((id, timestamp, duration, exit, command, cwd, session, hostname)) = rx.recv().await {
let id = hash_str(&id);
let timestamp = DateTime::from_timestamp_nanos(timestamp);
let duration = if duration != -1 { Some(duration) } else { None };
let exit = if exit != -1 { Some(exit) } else { None };
let cwd = if cwd != "unknown" { Some(cwd) } else { None };
writer.as_mut().write(&[&id, &timestamp, &command, &cwd, &duration, &exit, &hostname, &session]).await?;
}
writer.finish().await?;
bg.await??;
Ok(())
}
fn url_for(&self, _table: &str, _column_content: &str) -> String {
unimplemented!()
}
}
impl AtuinIndexer {
pub async fn new(config: toml::Table) -> Result<Box<Self>> {
let config: Config = config.try_into()?;
Ok(Box::new(AtuinIndexer {
config: Arc::new(config)
}))
}
fn read_database(&self, target: tokio::sync::mpsc::Sender<(String, i64, i64, i32, String, String, String, String)>, min_timestamp: i64) -> Result<()> {
let conn = rusqlite::Connection::open_with_flags(&self.config.db_path, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
let mut fetch_stmt = conn.prepare("SELECT id, timestamp, duration, exit, command, cwd, session, hostname FROM history WHERE timestamp > ? ORDER BY timestamp ASC")?;
for row in fetch_stmt.query_map([min_timestamp], |row| {
let id = row.get(0)?;
let timestamp = row.get(1)?;
let duration = row.get(2)?;
let exit = row.get(3)?;
let command = row.get(4)?;
let cwd = row.get(5)?;
let session = row.get(6)?;
let hostname = row.get(7)?;
Ok((id, timestamp, duration, exit, command, cwd, session, hostname))
})? {
let row = row?;
target.blocking_send(row)?;
}
Ok(())
}
}

View File

@ -0,0 +1,282 @@
use std::collections::HashSet;
use std::ffi::OsStr;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{Context, Result};
use async_walkdir::{WalkDir, DirEntry};
use compact_str::{CompactString, ToCompactString};
use epub::doc::EpubDoc;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use crate::util::{hash_str, parse_html, parse_date, systemtime_to_utc, CONFIG};
use crate::indexer::{delete_nonexistent_files, Ctx, Indexer, TableSpec, ColumnSpec};
use chrono::prelude::*;
use std::str::FromStr;
#[derive(Serialize, Deserialize, Clone)]
struct Config {
path: String
}
pub struct BooksIndexer {
config: Config
}
#[derive(Clone, Debug)]
pub struct EpubParse {
title: String,
author: Vec<String>,
description: String,
series: Option<String>,
series_index: Option<f32>,
publication_date: DateTime<Utc>,
chapters: Vec<(String, String)>,
tags: Vec<String>
}
pub fn parse_epub(path: &PathBuf) -> Result<EpubParse> {
let mut epub = EpubDoc::new(path).context("initial doc load")?;
let description = epub.metadata.get("description")
.and_then(|v| v.iter().map(|x| x.as_str()).next())
.unwrap_or_default();
let description = parse_html(description.as_bytes(), false).0;
let publication_date = epub.metadata.get("date").context("invalid book (no date)")?.get(0).context("invalid book")?;
let publication_date = parse_date(&publication_date)?;
let series_index = epub.metadata.get("calibre:series_index").and_then(|x| x.get(0)).and_then(|x| f32::from_str(x.as_str()).ok());
let series = epub.metadata.get("calibre:series").and_then(|x| x.get(0)).cloned();
let mut parse = EpubParse {
title: epub.metadata.get("title").context("invalid book (no title)")?.get(0).context("invalid book")?.to_string(),
author: epub.metadata.get("creator").map(|x| x.clone()).unwrap_or_default(),
description,
series_index: series.as_ref().and(series_index),
series,
publication_date,
chapters: vec![],
tags: epub.metadata.get("subject").cloned().unwrap_or_else(Vec::new)
};
let mut seen = HashSet::new();
for navpoint in epub.toc.clone() {
let path = navpoint.content;
let mut last = path.to_str().context("invalid path")?;
if let Some((fst, _snd)) = last.split_once('#') {
last = fst;
}
// It's valid for a single XHTML file in a book to contain multiple semantic "chapters".
// Splitting at the link target would be annoying, so we discard repeats like that.
if !seen.contains(last) {
seen.insert(last.to_string());
let resource = epub.get_resource_str_by_path(last).with_context(|| format!("resource {} nonexistent", last))?;
let html = parse_html(resource.as_bytes(), true);
parse.chapters.push((
html.0,
navpoint.label.clone()
))
}
}
Ok(parse)
}
async fn handle_epub(relpath: CompactString, ctx: Arc<Ctx>, entry: DirEntry, existing_files: Arc<RwLock<HashSet<CompactString>>>) -> Result<()> {
let epub = entry.path();
let mut conn = ctx.pool.get().await?;
let metadata = entry.metadata().await?;
let row = conn.query_opt("SELECT timestamp FROM books WHERE id = $1", &[&hash_str(&relpath)]).await?;
existing_files.write().await.insert(relpath.clone());
let timestamp: DateTime<Utc> = row.map(|r| r.get(0)).unwrap_or(DateTime::<Utc>::MIN_UTC);
let modtime = systemtime_to_utc(metadata.modified()?)?;
if modtime > timestamp {
let parse_result = match tokio::task::spawn_blocking(move || parse_epub(&epub)).await? {
Ok(x) => x,
Err(e) => {
log::warn!("Failed parse for {}: {}", relpath, e);
return Ok(())
}
};
let tx = conn.transaction().await?;
let id = hash_str(&relpath);
tx.execute("DELETE FROM books WHERE id = $1", &[&id]).await?;
tx.execute("INSERT INTO books VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)", &[
&id,
&relpath.as_str(),
&parse_result.title,
&parse_result.author,
&parse_result.description,
&parse_result.series,
&parse_result.series_index,
&parse_result.publication_date,
&parse_result.tags,
&modtime
]).await?;
for (ix, (content, title)) in parse_result.chapters.into_iter().enumerate() {
tx.execute("INSERT INTO chapters (book, chapter_index, chapter_title, content) VALUES ($1, $2, $3, $4)", &[
&id,
&(ix as i16),
&title,
&content
]).await?;
}
tx.commit().await?;
}
Result::Ok(())
}
#[async_trait::async_trait]
impl Indexer for BooksIndexer {
fn name(&self) -> &'static str {
"books"
}
fn schemas(&self) -> &'static [&'static str] {
&[r#"
CREATE TABLE books (
id BIGINT PRIMARY KEY,
path TEXT NOT NULL UNIQUE,
title TEXT NOT NULL,
author TEXT[] NOT NULL,
description TEXT NOT NULL,
series TEXT,
series_index REAL,
publication_date TIMESTAMPTZ,
tags TEXT[],
timestamp TIMESTAMPTZ
);
CREATE TABLE chapters (
id BIGSERIAL PRIMARY KEY,
book BIGINT NOT NULL REFERENCES books (id) ON DELETE CASCADE,
chapter_index SMALLINT NOT NULL,
chapter_title TEXT NOT NULL,
content TEXT NOT NULL
);
"#]
}
fn tables(&self) -> &'static [TableSpec] {
&[
TableSpec {
name: "books",
parent: None,
columns: &[
ColumnSpec {
name: "title",
fts: true,
fts_short: true,
trigram: true,
is_array: false
},
ColumnSpec {
name: "author",
fts: false,
fts_short: false,
trigram: true,
is_array: true
},
ColumnSpec {
name: "description",
fts: true,
fts_short: false,
trigram: false,
is_array: false
},
ColumnSpec {
name: "tags",
fts: false,
fts_short: false,
trigram: true,
is_array: true
}
],
url_source_column: None,
title_source_column: "title",
summary_columns: &["title", "author", "tags", "series", "series_index", "publication_date"]
},
TableSpec {
name: "chapters",
parent: Some(("book", "books")),
columns: &[
ColumnSpec {
name: "chapter_title",
fts: true,
fts_short: true,
trigram: true,
is_array: false
},
ColumnSpec {
name: "content",
fts: true,
fts_short: false,
trigram: false,
is_array: false
}
],
url_source_column: None,
title_source_column: "chapter_title",
summary_columns: &["chapter_index", "chapter_title"]
}
]
}
async fn run(&self, ctx: Arc<Ctx>) -> Result<()> {
let existing_files = Arc::new(RwLock::new(HashSet::new()));
let entries = WalkDir::new(&self.config.path);
let base_path = &self.config.path;
entries.map_err(|e| anyhow::Error::from(e)).try_for_each_concurrent(Some(CONFIG.concurrency), |entry|
{
let ctx = ctx.clone();
let existing_files = existing_files.clone();
async move {
let real_path = entry.path();
let path = if let Some(path) = real_path.strip_prefix(base_path)?.to_str() {
path
} else {
return Result::Ok(());
};
let ext = real_path.extension().and_then(OsStr::to_str);
if !entry.file_type().await?.is_file() || "epub" != ext.unwrap_or_default() {
return Ok(());
}
let conn = ctx.pool.get().await?;
existing_files.write().await.insert(CompactString::from(path));
let metadata = entry.metadata().await?;
let row = conn.query_opt("SELECT timestamp FROM books WHERE id = $1", &[&hash_str(path)]).await?;
let timestamp: DateTime<Utc> = row.map(|r| r.get(0)).unwrap_or(DateTime::<Utc>::MIN_UTC);
let modtime = systemtime_to_utc(metadata.modified()?)?;
if modtime > timestamp {
let relpath = entry.path().as_path().strip_prefix(&base_path)?.as_os_str().to_str().context("invalid path")?.to_compact_string();
handle_epub(relpath.clone(), ctx, entry, existing_files).await
} else {
Ok(())
}
}
}).await?;
{
let existing = existing_files.read().await;
delete_nonexistent_files(ctx, "SELECT path FROM books", "DELETE FROM books WHERE id = $1", &existing).await?;
}
Ok(())
}
fn url_for(&self, _table: &str, _column_content: &str) -> String {
unimplemented!()
}
}
impl BooksIndexer {
pub async fn new(config: toml::Table) -> Result<Box<Self>> {
let config: Config = config.try_into()?;
Ok(Box::new(BooksIndexer {
config
}))
}
}

View File

@ -0,0 +1,196 @@
use std::sync::Arc;
use anyhow::{Context, Result};
use futures::pin_mut;
use serde::{Deserialize, Serialize};
use crate::{indexer::{ColumnSpec, Ctx, Indexer, TableSpec}, util::hash_str};
use chrono::prelude::*;
use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type};
use rusqlite::OpenFlags;
use tokio::sync::mpsc::Sender;
#[derive(Serialize, Deserialize)]
struct Config {
db_path: String,
include_untimestamped_records: bool
}
#[derive(Clone)]
pub struct FirefoxHistoryDumpIndexer {
config: Arc<Config>
}
#[async_trait::async_trait]
impl Indexer for FirefoxHistoryDumpIndexer {
fn name(&self) -> &'static str {
"browser_history"
}
fn schemas(&self) -> &'static [&'static str] {
&[r#"
CREATE TABLE browser_places (
id BIGINT PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
url TEXT NOT NULL,
title TEXT,
visit_count BIGINT NOT NULL,
description TEXT,
preview_image TEXT
);
CREATE TABLE bookmarks (
id BIGINT PRIMARY KEY,
place BIGINT NOT NULL REFERENCES browser_places(id),
title TEXT NOT NULL,
added TIMESTAMPTZ NOT NULL,
timestamp TIMESTAMPTZ NOT NULL
);
CREATE TABLE history (
id BIGINT PRIMARY KEY,
place BIGINT NOT NULL REFERENCES browser_places(id),
timestamp TIMESTAMPTZ NOT NULL,
ty INTEGER NOT NULL
);
"#]
}
fn tables(&self) -> &'static [TableSpec] {
&[
TableSpec {
name: "browser_places",
parent: None,
columns: &[
ColumnSpec {
name: "title",
fts: true,
fts_short: false,
trigram: true,
is_array: false
},
ColumnSpec {
name: "description",
fts: true,
fts_short: false,
trigram: false,
is_array: false
}
],
url_source_column: Some("url"),
title_source_column: "title",
summary_columns: &["url", "timestamp", "title", "visit_count", "description"]
},
TableSpec {
name: "bookmarks",
columns: &[
ColumnSpec {
name: "title",
fts: true,
fts_short: false,
trigram: true,
is_array: false
}
],
url_source_column: None,
parent: Some(("place", "browser_places")),
title_source_column: "title",
summary_columns: &["title", "added", "timestamp"]
},
TableSpec {
name: "history",
parent: Some(("place", "browser_places")),
title_source_column: "id",
columns: &[],
summary_columns: &["timestamp", "ty"],
url_source_column: None
}
]
}
async fn run(&self, ctx: Arc<Ctx>) -> Result<()> {
let (places_tx, mut places_rx) = tokio::sync::mpsc::channel(1000);
let (bookmarks_tx, mut bookmarks_rx) = tokio::sync::mpsc::channel(1000);
let (history_tx, mut history_rx) = tokio::sync::mpsc::channel(1000);
let self = self.clone();
let conn = ctx.pool.get().await?;
let max_places_timestamp = conn.query_one("SELECT max(timestamp) FROM browser_places", &[]).await?.get::<_, Option<DateTime<Utc>>>(0);
let max_places_timestamp = max_places_timestamp.map(|x| x.timestamp_micros()).unwrap_or(0);
let max_bookmarks_timestamp = conn.query_one("SELECT max(timestamp) FROM bookmarks", &[]).await?.get::<_, Option<DateTime<Utc>>>(0);
let max_bookmarks_timestamp = max_bookmarks_timestamp.map(|x| x.timestamp_micros()).unwrap_or(0);
let max_history_timestamp = conn.query_one("SELECT max(timestamp) FROM history", &[]).await?.get::<_, Option<DateTime<Utc>>>(0);
let max_history_timestamp = max_history_timestamp.map(|x| x.timestamp_micros()).unwrap_or(0);
let bg = tokio::task::spawn_blocking(move || self.read_database(places_tx, max_places_timestamp, bookmarks_tx, max_bookmarks_timestamp, history_tx, max_history_timestamp));
while let Some((id, url, title, visit_count, last_visit_date, description, preview_image_url)) = places_rx.recv().await {
conn.execute("INSERT INTO browser_places VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO UPDATE SET timestamp = $2, visit_count = $5", &[&hash_str(&id), &DateTime::from_timestamp_micros(last_visit_date.unwrap_or_default()).context("invalid time")?, &url, &title.map(|x| x.replace("\0", "")), &visit_count, &description.map(|x| x.replace("\0", "")), &preview_image_url]).await?;
}
while let Some((id, place, title, date_added, last_modified)) = bookmarks_rx.recv().await {
conn.execute("INSERT INTO bookmarks VALUES ($1, $2, $3, $4, $5) ON CONFLICT (id) DO UPDATE SET title = $3, timestamp = $5", &[&hash_str(&id), &hash_str(&place), &title, &DateTime::from_timestamp_micros(date_added).context("invalid time")?, &DateTime::from_timestamp_micros(last_modified).context("invalid time")?]).await?;
}
let sink = conn.copy_in("COPY history (id, place, timestamp, ty) FROM STDIN BINARY").await?;
let writer = BinaryCopyInWriter::new(sink, &[Type::INT8, Type::INT8, Type::TIMESTAMPTZ, Type::INT4]);
pin_mut!(writer);
while let Some((id, place, ts, ty)) = history_rx.recv().await {
let id: i64 = hash_str(&id);
let place = hash_str(&place);
let timestamp = DateTime::from_timestamp_micros(ts).context("invalid time")?;
writer.as_mut().write(&[&id, &place, &timestamp, &ty]).await?;
}
writer.finish().await?;
bg.await??;
Ok(())
}
fn url_for(&self, _table: &str, column_content: &str) -> String {
column_content.to_string()
}
}
impl FirefoxHistoryDumpIndexer {
pub async fn new(config: toml::Table) -> Result<Box<Self>> {
let config: Config = config.try_into()?;
Ok(Box::new(FirefoxHistoryDumpIndexer {
config: Arc::new(config)
}))
}
fn read_database(&self, places: Sender<(String, String, Option<String>, i64, Option<i64>, Option<String>, Option<String>)>, places_min_ts: i64, bookmarks: Sender<(String, String, String, i64, i64)>, bookmarks_min_ts: i64, history: Sender<(String, String, i64, i32)>, history_min_ts: i64) -> Result<()> {
let conn = rusqlite::Connection::open_with_flags(&self.config.db_path, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
// Apparently some records in my history database have no last_visit_date. I don't know what happened to it or why, but this is not fixable, so add an ugly hack for them.
let mut fetch_places = conn.prepare(if self.config.include_untimestamped_records { "SELECT guid, url, title, visit_count, last_visit_date, description, preview_image_url FROM places WHERE last_visit_date > ? OR last_visit_date IS NULL ORDER BY last_visit_date ASC" } else { "SELECT guid, url, title, visit_count, last_visit_date, description, preview_image_url FROM places WHERE last_visit_date > ? OR last_visit_date IS NULL ORDER BY last_visit_date ASC" })?;
for row in fetch_places.query_map([places_min_ts], |row| {
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?, row.get(5)?, row.get(6)?))
})? {
let row = row?;
places.blocking_send(row)?;
}
std::mem::drop(places);
let mut fetch_bookmarks = conn.prepare("SELECT guid, bookmark, title, dateAdded, lastModified FROM bookmarks WHERE lastModified > ? AND title IS NOT NULL ORDER BY lastModified ASC")?;
for row in fetch_bookmarks.query_map([bookmarks_min_ts], |row| {
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?))
})? {
let row = row?;
bookmarks.blocking_send(row)?;
}
std::mem::drop(bookmarks);
let mut fetch_places = conn.prepare("SELECT id, place, date, type FROM historyvisits WHERE date > ? ORDER BY date ASC")?;
for row in fetch_places.query_map([history_min_ts], |row| {
Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
})? {
let row = row?;
history.blocking_send(row)?;
}
std::mem::drop(history);
Ok(())
}
}

View File

@ -0,0 +1,559 @@
use std::fmt;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;
use std::str::FromStr;
use anyhow::{anyhow, Context, Result};
use compact_str::CompactString;
use futures::{StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use tokio::process::Command;
use tokio::sync::RwLock;
use crate::util::{hash_str, parse_date, parse_html, systemtime_to_utc, urlencode, CONFIG};
use crate::indexer::{Ctx, Indexer, TableSpec, delete_nonexistent_files, ColumnSpec};
use async_walkdir::WalkDir;
use chrono::prelude::*;
use regex::{RegexSet, Regex};
#[derive(Serialize, Deserialize, Debug)]
struct Config {
path: String,
#[serde(default)]
ignore_regexes: Vec<String>,
#[serde(default)]
ignore_metadata: Vec<String>,
base_url: String
}
pub struct MediaFilesIndexer {
config: Config,
ignore_files: RegexSet,
ignore_metadata: Arc<RegexSet>
}
#[derive(Debug)]
enum AutoSubsState {
NotRequired = 0,
Pending = 1,
Generated = 2
}
impl Default for AutoSubsState {
fn default() -> Self {
AutoSubsState::Pending
}
}
#[derive(Debug, Default)]
struct MediaParse {
format: Vec<String>,
creation_timestamp: Option<DateTime<Utc>>,
title: Option<String>,
description: Option<String>,
series: Option<String>,
season: Option<i32>,
episode: Option<i32>,
creator: Option<String>,
url: Option<String>,
duration: f32,
subs: Option<String>,
chapters: String,
raw_parse: serde_json::Value,
auto_subs: AutoSubsState
}
#[derive(Deserialize, Debug)]
struct Chapter {
start_time: String,
end_time: String,
tags: HashMap<String, String>
}
#[derive(Deserialize, Debug)]
struct Disposition {
default: u8
}
#[derive(Deserialize, Debug)]
struct Stream {
index: usize,
codec_name: Option<String>,
duration: Option<String>,
codec_type: String,
#[serde(default)]
tags: HashMap<String, String>,
width: Option<u32>,
disposition: Option<Disposition>
}
#[derive(Deserialize, Debug)]
struct Format {
duration: String,
tags: HashMap<String, String>
}
#[derive(Deserialize, Debug)]
struct Probe {
streams: Vec<Stream>,
chapters: Vec<Chapter>,
format: Format
}
lazy_static::lazy_static! {
static ref SEASON_EPNUM: Regex = Regex::new(r"(?i:s(\d+)e(\d+)[e0-9-]*\s*)").unwrap();
static ref DURATION_STRING: Regex = Regex::new(r"(\d+):(\d+):(\d+[.,]\d+)").unwrap();
}
fn parse_duration(s: &str) -> Result<f32> {
let mat = DURATION_STRING.captures(s).context("duration misformatted")?;
let duration = f32::from_str(mat.get(1).unwrap().as_str())? * 3600.0
+ f32::from_str(mat.get(2).unwrap().as_str())? * 60.0
+ f32::from_str(&mat.get(3).unwrap().as_str().replace(",", "."))?;
Ok(duration)
}
fn format_duration<W: fmt::Write>(mut seconds: f32, mut f: W) {
let hours = (seconds / 3600.0).floor();
seconds -= 3600.0 * hours;
let minutes = (seconds / 60.0).floor();
seconds -= 60.0 * minutes;
let full_seconds = seconds.floor();
write!(&mut f, "{:02}:{:02}:{:02}", hours, minutes, full_seconds).unwrap();
}
#[derive(Debug)]
enum SRTParseState {
ExpectSeqNumOrData,
ExpectTime,
ExpectData
}
fn parse_srt(srt: &str) -> Result<String> {
use SRTParseState::*;
let mut out = String::new();
let mut time = (0.0, 0.0);
let mut state = ExpectSeqNumOrData;
for line in srt.lines() {
match state {
ExpectSeqNumOrData => {
if usize::from_str(line).is_ok() { state = ExpectTime }
else { state = ExpectData }
},
ExpectTime => {
let (fst, snd) = line.split_once(" --> ").context("invalid time line")?;
time = (parse_duration(fst)?, parse_duration(snd)?);
state = ExpectData;
},
ExpectData => {
if line == "" {
state = ExpectSeqNumOrData;
} else {
let line = &parse_html(line.as_bytes(), false).0; // SRT can contain basic HTML for some reason
if !out.trim_end().ends_with(line.trim_end()) {
out.push_str("[");
format_duration(time.0, &mut out);
out.push_str(" -> ");
format_duration(time.1, &mut out);
out.push_str("] ");
out.push_str(line);
out.push_str("\n");
}
}
}
}
}
Ok(out)
}
fn parse_filename(path: &PathBuf) -> Option<(String, String, Option<i32>, Option<i32>)> {
let stem = path.file_stem()?.to_str()?;
let dirname = path.parent()?.file_name()?.to_str()?;
if let Some(mat) = SEASON_EPNUM.captures(stem) {
let name = &stem[mat.get(0).unwrap().end()..];
let se = i32::from_str(mat.get(1).unwrap().as_str()).unwrap();
let ep = i32::from_str(mat.get(2).unwrap().as_str()).unwrap();
Some((dirname.to_string(), name.to_string(), Some(se), Some(ep)))
} else {
Some((dirname.to_string(), stem.to_string(), None, None))
}
}
mod test {
#[test]
fn test_filename_parser() {
use std::{path::PathBuf, str::FromStr};
assert_eq!(super::parse_filename(&PathBuf::from_str("Test 1/s01e02-03 Example.mkv").unwrap()), Some((
String::from("Test 1"),
String::from("Example"),
Some(1),
Some(2)
)));
assert_eq!(super::parse_filename(&PathBuf::from_str("Test 2/s9e9 Simple.mkv").unwrap()), Some((
String::from("Test 2"),
String::from("Simple"),
Some(9),
Some(9)
)));
assert_eq!(super::parse_filename(&PathBuf::from_str("Invalid").unwrap()), None);
}
#[test]
fn test_duration() {
use super::{parse_duration, format_duration};
let mut s = String::new();
format_duration(parse_duration("00:01:02,410").unwrap(), &mut s);
assert_eq!(s, "00:01:02.410");
}
}
fn score_subtitle_stream(stream: &Stream, others: &Vec<Stream>, parse_so_far: &MediaParse) -> i8 {
if stream.width.is_some() {
return i8::MIN; // video subtitle track - unusable
}
if stream.codec_type != "subtitle" {
return i8::MIN;
}
// subtitles from these videos are problematic - TODO this is not a very elegant way to do this
match &parse_so_far.description {
Some(x) if x.starts_with("https://web.microsoftstream.com/video/") => return i8::MIN,
_ => ()
};
// YouTube subtitle tracks are weird, at least as I download them
match &parse_so_far.url {
Some(x) if x.starts_with("https://www.youtube.com") => {
// Due to something, I appear to sometimes have streams duplicated under different labels.
for other in others {
if other.index != stream.index && stream.tags.get("DURATION") == other.tags.get("DURATION") && stream.tags.get("language") == other.tags.get("language") && stream.codec_name == other.codec_name {
let result = score_subtitle_stream(other, &vec![], parse_so_far);
if result < 0 {
return result;
}
}
}
let title = stream.tags.get("title").map(String::as_str).unwrap_or_default();
if title == "English (Original)" {
return -1
}
if title.starts_with("English from") && title != "English from English" {
return -2
}
},
_ => ()
}
match stream.disposition {
Some(Disposition { default: 1 }) => return 2,
_ => ()
}
1
}
async fn parse_media(path: &PathBuf, ignore: Arc<RegexSet>) -> Result<MediaParse> {
let ffmpeg = Command::new("ffprobe")
.arg("-hide_banner")
.arg("-print_format").arg("json")
.arg("-v").arg("error")
.arg("-show_chapters")
.arg("-show_format")
.arg("-show_streams")
.arg(path)
.output().await?;
if !ffmpeg.status.success() {
return Err(anyhow!("ffmpeg failure: {}", String::from_utf8_lossy(&ffmpeg.stderr)))
}
let probe: Probe = serde_json::from_slice(&ffmpeg.stdout).context("ffmpeg parse")?;
let mut result = MediaParse {
raw_parse: serde_json::from_slice(&ffmpeg.stdout)?,
..Default::default()
};
if let Some((series, title, season, episode)) = parse_filename(path) {
result.series = Some(series);
result.title = Some(title);
result.season = season;
result.episode = episode;
}
{
let mut tags = probe.format.tags;
let mut drop = vec![];
for (k, v) in tags.iter() {
if ignore.is_match(v) {
drop.push(k.to_string());
}
}
for x in drop {
tags.remove(&x);
}
if let Some(date) = tags.get("creation_time").or_else(|| tags.get("DATE")).or_else(|| tags.get("date")) {
result.creation_timestamp = parse_date(date).ok();
}
result.title = result.title.or(tags.get("title").cloned());
result.url = tags.get("PURL").cloned();
result.description = tags.get("DESCRIPTION")
.or_else(|| tags.get("comment"))
.or_else(|| tags.get("synopsis"))
.cloned();
result.creator = tags.get("ARTIST").or_else(|| tags.get("artist")).cloned();
if let Some(season) = tags.get("season_number") {
result.season = Some(i32::from_str(season)?);
}
if let Some(episode) = tags.get("episode_sort") {
result.episode = Some(i32::from_str(episode)?);
}
}
result.duration = f32::from_str(&probe.format.duration)?;
let mut best_subtitle_track = (0, i8::MIN);
// For some reason I forgot using stream duration was more accurate in some circumstance.
for stream in probe.streams.iter() {
if let Some(dur) = &stream.duration {
result.duration = result.duration.max(f32::from_str(&dur)?);
}
if let Some(dur) = stream.tags.get("DURATION") {
result.duration = result.duration.max(parse_duration(&dur)?);
}
result.format.push(stream.codec_name.as_ref().unwrap_or(&stream.codec_type).to_string());
let score = score_subtitle_stream(stream, &probe.streams, &result);
if score > best_subtitle_track.1 {
best_subtitle_track = (stream.index, score)
}
}
// if we have any remotely acceptable subtitles, use ffmpeg to read them out
if best_subtitle_track.1 > i8::MIN {
let ffmpeg = Command::new("ffmpeg")
.arg("-hide_banner").arg("-v").arg("quiet")
.arg("-i").arg(path)
.arg("-map").arg(format!("0:{}", best_subtitle_track.0))
.arg("-f").arg("srt")
.arg("-")
.output().await?;
if ffmpeg.status.success() {
let subs = parse_srt(&String::from_utf8_lossy(&ffmpeg.stdout))?;
if !subs.is_empty() {
result.subs = Some(subs);
// don't schedule automatic subtitling if existing ones okay
if best_subtitle_track.1 > 0 {
result.auto_subs = AutoSubsState::NotRequired;
}
}
}
}
for chapter in probe.chapters {
result.chapters.push_str("[");
format_duration(f32::from_str(&chapter.start_time)?, &mut result.chapters);
result.chapters.push_str(" -> ");
format_duration(f32::from_str(&chapter.end_time)?, &mut result.chapters);
result.chapters.push_str("] ");
result.chapters.push_str(&chapter.tags.get("title").map(String::as_str).unwrap_or(""));
result.chapters.push_str("\n");
}
Ok(result)
}
#[async_trait::async_trait]
impl Indexer for MediaFilesIndexer {
fn name(&self) -> &'static str {
"media_files"
}
fn schemas(&self) -> &'static [&'static str] {
&[r#"
CREATE TABLE media_files (
id BIGINT PRIMARY KEY,
path TEXT NOT NULL UNIQUE,
timestamp TIMESTAMPTZ NOT NULL,
format TEXT[] NOT NULL,
creation_timestamp TIMESTAMPTZ,
title TEXT,
description TEXT,
series TEXT,
season INTEGER,
episode INTEGER,
creator TEXT,
url TEXT,
duration REAL NOT NULL,
subs TEXT,
auto_subs_state SMALLINT NOT NULL,
chapters TEXT,
probe JSONB NOT NULL
);
"#]
}
fn tables(&self) -> &'static [TableSpec] {
&[
TableSpec {
name: "media_files",
parent: None,
columns: &[
ColumnSpec {
name: "path",
fts: true,
fts_short: true,
trigram: true,
is_array: false
},
ColumnSpec {
name: "title",
fts: true,
fts_short: true,
trigram: true,
is_array: false
},
ColumnSpec {
name: "description",
fts: true,
fts_short: false,
trigram: false,
is_array: false
},
ColumnSpec {
name: "series",
fts: false,
fts_short: false,
trigram: true,
is_array: false
},
ColumnSpec {
name: "creator",
fts: false,
fts_short: false,
trigram: true,
is_array: false
},
ColumnSpec {
name: "subs",
fts: true,
fts_short: false,
trigram: false,
is_array: false
},
ColumnSpec {
name: "chapters",
fts: true,
fts_short: false,
trigram: false,
is_array: false
}
],
url_source_column: Some("path"),
title_source_column: "title",
summary_columns: &["path", "timestamp", "format", "creation_timestamp", "title", "series", "season", "episode", "url", "duration"]
}
]
}
async fn run(&self, ctx: Arc<Ctx>) -> Result<()> {
let entries = WalkDir::new(&self.config.path);
let ignore = &self.ignore_files;
let base_path = &self.config.path;
let ignore_metadata = self.ignore_metadata.clone();
let existing_files = Arc::new(RwLock::new(HashSet::new()));
entries
.map_err(|e| anyhow::Error::from(e))
.filter(|r| {
// ignore permissions errors because things apparently break otherwise
let keep = match r {
Err(_e) => false,
_ => true
};
async move { keep }
})
.try_for_each_concurrent(Some(CONFIG.concurrency), |entry| {
let ctx = ctx.clone();
let existing_files = existing_files.clone();
let ignore_metadata = ignore_metadata.clone();
async move {
let real_path = entry.path();
let path = if let Some(path) = real_path.strip_prefix(base_path)?.to_str() {
path
} else {
return Result::Ok(());
};
if ignore.is_match(path) || !entry.file_type().await?.is_file() {
return Ok(());
}
let mut conn = ctx.pool.get().await?;
existing_files.write().await.insert(CompactString::from(path));
let metadata = entry.metadata().await?;
let row = conn.query_opt("SELECT timestamp FROM media_files WHERE id = $1", &[&hash_str(path)]).await?;
let timestamp: DateTime<Utc> = row.map(|r| r.get(0)).unwrap_or(DateTime::<Utc>::MIN_UTC);
let modtime = systemtime_to_utc(metadata.modified()?)?;
if modtime > timestamp {
match parse_media(&real_path, ignore_metadata).await {
Ok(x) => {
let tx = conn.transaction().await?;
tx.execute("DELETE FROM media_files WHERE id = $1", &[&hash_str(path)]).await?;
tx.execute("INSERT INTO media_files VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)", &[
&hash_str(path),
&path,
&modtime,
&x.format,
&x.creation_timestamp,
&x.title,
&x.description,
&x.series,
&x.season,
&x.episode,
&x.creator,
&x.url,
&x.duration,
&x.subs,
&(x.auto_subs as i16),
&x.chapters,
&tokio_postgres::types::Json(x.raw_parse)
]).await?;
tx.commit().await?;
},
Err(e) => {
log::warn!("Media parse {}: {:?}", &path, e)
}
}
}
Result::Ok(())
}
}).await?;
{
let existing = existing_files.read().await;
delete_nonexistent_files(ctx, "SELECT path FROM media_files", "DELETE FROM media_files WHERE id = $1", &existing).await?;
}
Ok(())
}
fn url_for(&self, _table: &str, column_content: &str) -> String {
format!("{}{}", self.config.base_url, urlencode(column_content))
}
}
impl MediaFilesIndexer {
pub async fn new(config: toml::Table) -> Result<Box<Self>> {
let config: Config = config.try_into()?;
Ok(Box::new(MediaFilesIndexer {
ignore_files: RegexSet::new(&config.ignore_regexes)?,
ignore_metadata: Arc::new(RegexSet::new(&config.ignore_metadata)?),
config
}))
}
}

View File

@ -0,0 +1,103 @@
use std::sync::Arc;
use anyhow::Result;
use serde::{Deserialize, Serialize};
use tokio_postgres::NoTls;
use crate::{indexer::{ColumnSpec, Ctx, Indexer, TableSpec}, util::parse_html};
use chrono::prelude::*;
#[derive(Serialize, Deserialize)]
struct Config {
source_database: String
}
#[derive(Clone)]
pub struct MinifluxIndexer {
config: Arc<Config>
}
#[async_trait::async_trait]
impl Indexer for MinifluxIndexer {
fn name(&self) -> &'static str {
"miniflux"
}
fn schemas(&self) -> &'static [&'static str] {
&[r#"
CREATE TABLE rss_items (
id BIGINT PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
title TEXT NOT NULL,
url TEXT NOT NULL,
content_html TEXT NOT NULL,
content TEXT NOT NULL,
author TEXT,
feed_title TEXT NOT NULL
);
"#]
}
fn tables(&self) -> &'static [TableSpec] {
&[
TableSpec {
name: "rss_items",
parent: None,
columns: &[
ColumnSpec {
name: "title",
fts: true,
fts_short: false,
trigram: true,
is_array: false
},
ColumnSpec {
name: "content",
fts: true,
fts_short: false,
trigram: false,
is_array: false
}
],
url_source_column: Some("url"),
title_source_column: "title",
summary_columns: &["timestamp", "title", "url", "feed_title"]
}
]
}
async fn run(&self, ctx: Arc<Ctx>) -> Result<()> {
let (src_conn, connection) = tokio_postgres::connect(&self.config.source_database, NoTls).await?;
tokio::spawn(connection);
let dest_conn = ctx.pool.get().await?;
let max_id = dest_conn.query_one("SELECT max(id) FROM rss_items", &[]).await?.get::<_, Option<i64>>(0).unwrap_or(-1);
let rows = src_conn.query("SELECT entries.id, entries.published_at, entries.title, entries.url, entries.content, entries.author, feeds.title FROM entries JOIN feeds ON feeds.id = entries.feed_id WHERE entries.id > $1 ORDER BY entries.id", &[&max_id]).await?;
for row in rows {
let id: i64 = row.get(0);
let pubdate: DateTime<Utc> = row.get(1);
let title: String = row.get(2);
let url: String = row.get(3);
let content_html: String = row.get(4);
let author: String = row.get(5);
let feed_title: String = row.get(6);
let content = parse_html(content_html.as_bytes(), false).0;
dest_conn.execute("INSERT INTO rss_items VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", &[&id, &pubdate, &title, &url, &content_html, &content, &author, &feed_title]).await?;
}
Ok(())
}
fn url_for(&self, _table: &str, column_content: &str) -> String {
column_content.to_string()
}
}
impl MinifluxIndexer {
pub async fn new(config: toml::Table) -> Result<Box<Self>> {
let config: Config = config.try_into()?;
Ok(Box::new(MinifluxIndexer {
config: Arc::new(config)
}))
}
}

View File

@ -0,0 +1,348 @@
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use serde::{Deserialize, Serialize};
use crate::util::hash_str;
use crate::indexer::{Ctx, Indexer, TableSpec, ColumnSpec};
use chrono::prelude::*;
use rusqlite::OpenFlags;
#[derive(Serialize, Deserialize)]
struct Config {
db_path: String,
base_url: String
}
#[derive(Clone)]
pub struct MinoteaurIndexer {
config: Arc<Config>
}
// https://github.com/osmarks/minoteaur-8/blob/master/src/storage.rs
// https://github.com/osmarks/minoteaur-8/blob/master/src/util.rs
mod minoteaur_types {
use serde::{Deserialize, Serialize};
use ulid::Ulid;
use chrono::Utc;
use std::collections::{BTreeSet, HashMap};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum Value {
Text(String),
Number(f64)
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
pub struct ContentSize {
pub words: usize,
pub bytes: usize,
pub lines: usize
}
pub type StructuredData = Vec<(String, Value)>;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct File {
#[serde(with="ulid::serde::ulid_as_u128")]
pub page: Ulid,
pub filename: String,
#[serde(with="chrono::serde::ts_milliseconds")]
pub created: chrono::DateTime<Utc>,
pub storage_path: String,
pub size: u64,
pub mime_type: String,
pub metadata: HashMap<String, String>
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Page {
#[serde(with="ulid::serde::ulid_as_u128")]
pub id: Ulid,
#[serde(with="chrono::serde::ts_milliseconds")]
pub updated: chrono::DateTime<Utc>,
#[serde(with="chrono::serde::ts_milliseconds")]
pub created: chrono::DateTime<Utc>,
pub title: String,
pub names: BTreeSet<String>,
pub content: String,
pub tags: BTreeSet<String>,
pub size: ContentSize,
#[serde(default)]
pub files: HashMap<String, File>,
#[serde(default)]
pub icon_filename: Option<String>,
#[serde(default)]
pub structured_data: StructuredData,
#[serde(default)]
pub theme: Option<String>
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum RevisionType {
AddName(String),
AddTag(String),
ContentUpdate { new_content_size: ContentSize, edit_distance: Option<u32> },
PageCreated,
RemoveName(String),
RemoveTag(String),
AddFile(String),
RemoveFile(String),
SetIconFilename(Option<String>),
SetStructuredData(StructuredData),
SetTheme(Option<String>),
Rename(String)
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PageView {
#[serde(with="ulid::serde::ulid_as_u128")]
pub page: Ulid,
#[serde(with="chrono::serde::ts_milliseconds")]
pub time: chrono::DateTime<Utc>
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RevisionHeader {
#[serde(with="ulid::serde::ulid_as_u128")]
pub id: Ulid,
#[serde(with="ulid::serde::ulid_as_u128")]
pub page: Ulid,
pub ty: RevisionType,
#[serde(with="chrono::serde::ts_milliseconds")]
pub time: chrono::DateTime<Utc>
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum Object {
Page(Page),
Revision(RevisionHeader),
PageView(PageView)
}
}
#[async_trait::async_trait]
impl Indexer for MinoteaurIndexer {
fn name(&self) -> &'static str {
"minoteaur"
}
fn schemas(&self) -> &'static [&'static str] {
&[r#"
CREATE TABLE IF NOT EXISTS mino_pages (
id BIGINT PRIMARY KEY,
ulid TEXT NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
created TIMESTAMPTZ NOT NULL,
title TEXT NOT NULL,
words INTEGER NOT NULL,
tags TEXT[] NOT NULL,
names TEXT[] NOT NULL,
content TEXT NOT NULL,
view_count BIGINT NOT NULL,
revision_count BIGINT NOT NULL,
last_view_timestamp TIMESTAMPTZ NOT NULL
);
CREATE TABLE IF NOT EXISTS mino_structured_data (
id BIGSERIAL PRIMARY KEY,
page BIGINT NOT NULL REFERENCES mino_pages(id) ON DELETE CASCADE,
key TEXT NOT NULL,
numeric_value DOUBLE PRECISION,
text_value TEXT
);
CREATE TABLE IF NOT EXISTS mino_files (
id BIGSERIAL PRIMARY KEY,
page BIGINT NOT NULL REFERENCES mino_pages(id) ON DELETE CASCADE,
filename TEXT NOT NULL,
size INTEGER NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
metadata JSONB NOT NULL
);
"#]
}
fn tables(&self) -> &'static [TableSpec] {
&[
TableSpec {
name: "mino_pages",
parent: None,
columns: &[
ColumnSpec {
name: "title",
fts: true,
fts_short: true,
trigram: true,
is_array: false
},
ColumnSpec {
name: "content",
fts: true,
fts_short: false,
trigram: false,
is_array: false
},
ColumnSpec {
name: "tags",
fts: true,
fts_short: true,
trigram: true,
is_array: true
},
ColumnSpec {
name: "names",
fts: true,
fts_short: true,
trigram: true,
is_array: true
}
],
url_source_column: Some("ulid"),
title_source_column: "title",
summary_columns: &["title", "tags", "timestamp", "created", "words", "view_count", "revision_count"]
},
TableSpec {
name: "mino_structured_data",
parent: Some(("page", "mino_pages")),
columns: &[
ColumnSpec {
name: "key",
fts: false,
fts_short: false,
trigram: true,
is_array: false
},
ColumnSpec {
name: "text_value",
fts: true,
fts_short: true,
trigram: true,
is_array: false
}
],
title_source_column: "key",
url_source_column: None,
summary_columns: &["key", "text_value", "numeric_value"]
},
TableSpec {
name: "mino_files",
parent: Some(("page", "mino_pages")),
columns: &[
ColumnSpec {
name: "filename",
fts: true,
fts_short: true,
trigram: true,
is_array: false
}
],
url_source_column: None,
title_source_column: "filename",
summary_columns: &["filename", "size", "timestamp", "metadata"]
}
]
}
async fn run(&self, ctx: Arc<Ctx>) -> Result<()> {
let (tx, mut rx) = tokio::sync::mpsc::channel(200);
let self = self.clone();
let bg = tokio::task::spawn_blocking(move || self.read_database(tx));
let mut timestamps = HashMap::new();
let mut conn = ctx.pool.get().await?;
for row in conn.query("SELECT ulid, timestamp, last_view_timestamp FROM mino_pages", &[]).await? {
let ulid: String = row.get(0);
let updated: DateTime<Utc> = row.get(1);
let last_view_timestamp: DateTime<Utc> = row.get(2);
timestamps.insert(ulid::Ulid::from_string(&ulid)?, (updated, last_view_timestamp));
}
while let Some((id, object)) = rx.recv().await {
match object {
minoteaur_types::Object::Page(page) => {
// If we already have the latest information on this page, skip it.
if let Some((updated_timestamp, _last_view_timestamp)) = timestamps.get(&id) {
if *updated_timestamp >= page.updated {
continue;
}
}
let ulid = id.to_string();
let int_id = hash_str(&ulid);
let tx = conn.transaction().await?;
tx.execute("DELETE FROM mino_pages WHERE id = $1", &[&int_id]).await?;
tx.execute("INSERT INTO mino_pages VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 0, 0, $10)",
&[&int_id, &ulid, &page.updated, &page.created, &page.title, &(page.size.words as i32), &page.tags.into_iter().collect::<Vec<String>>(), &page.names.into_iter().collect::<Vec<String>>(), &page.content, &page.created])
.await?;
for (key, value) in page.structured_data {
let (num, text) = match value {
minoteaur_types::Value::Number(x) => (Some(x), None),
minoteaur_types::Value::Text(t) => (None, Some(t))
};
tx.execute("INSERT INTO mino_structured_data (page, key, numeric_value, text_value) VALUES ($1, $2, $3, $4)", &[&int_id, &key, &num, &text]).await?;
}
for (_key, file) in page.files {
tx.execute("INSERT INTO mino_files (page, filename, size, timestamp, metadata) VALUES ($1, $2, $3, $4, $5)", &[&int_id, &file.filename, &(file.size as i32), &file.created, &tokio_postgres::types::Json(file.metadata)]).await?;
}
tx.commit().await?;
},
// These should only occur after the page's record, with the exception of page creation.
minoteaur_types::Object::PageView(view) => {
if let Some((_updated_timestamp, last_view_timestamp)) = timestamps.get(&view.page) {
if *last_view_timestamp >= view.time {
continue;
}
}
let int_id = hash_str(&view.page.to_string());
conn.execute("UPDATE mino_pages SET view_count = view_count + 1, last_view_timestamp = $2 WHERE id = $1", &[&int_id, &view.time]).await?;
},
minoteaur_types::Object::Revision(rev) => {
// There's no separate "last revision timestamp" because revisions should always be associated with the updated field being adjusted.
if let Some((updated_timestamp, _last_view_timestamp)) = timestamps.get(&rev.page) {
if *updated_timestamp >= rev.time {
continue;
}
}
if let minoteaur_types::RevisionType::PageCreated = rev.ty {
continue;
}
let int_id = hash_str(&rev.page.to_string());
conn.execute("UPDATE mino_pages SET revision_count = revision_count + 1 WHERE id = $1", &[&int_id]).await?;
}
}
}
// Minoteaur doesn't have a delete button so not supporting deletes is clearly fine, probably.
bg.await??;
Ok(())
}
fn url_for(&self, _table: &str, column_content: &str) -> String {
format!("{}#/page/{}", self.config.base_url, column_content)
}
}
impl MinoteaurIndexer {
pub async fn new(config: toml::Table) -> Result<Box<Self>> {
let config: Config = config.try_into()?;
Ok(Box::new(MinoteaurIndexer {
config: Arc::new(config)
}))
}
fn read_database(&self, target: tokio::sync::mpsc::Sender<(ulid::Ulid, minoteaur_types::Object)>) -> Result<()> {
let conn = rusqlite::Connection::open_with_flags(&self.config.db_path, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
// Minoteaur databases are structured so that the system state can be understood by reading objects in ID order, as ID increases with timestamp.
// Technically the clocks might not be perfectly monotonic but this is unlikely enough to ever be significant that I don't care.
let mut objs_stmt = conn.prepare("SELECT id, data FROM objects ORDER BY id ASC")?;
for row in objs_stmt.query_map([], |row| {
let id: Vec<u8> = row.get(0)?;
let data: Vec<u8> = row.get(1)?;
Ok((id, data))
})? {
let (id, data) = row?;
target.blocking_send((ulid::Ulid::from_bytes(id.try_into().map_err(|_| anyhow!("conversion failure"))?), rmp_serde::decode::from_slice(&data).context("parse object")?))?;
}
Ok(())
}
}

View File

@ -0,0 +1,10 @@
pub mod textfiles;
pub mod books;
pub mod mediafiles;
pub mod anki;
pub mod minoteaur;
pub mod thunderbird_email;
pub mod miniflux;
pub mod atuin;
pub mod rclwe;
pub mod firefox_history_dump;

View File

@ -0,0 +1,160 @@
use std::{collections::HashMap, path::PathBuf, str::FromStr, sync::Arc};
use anyhow::{Context, Result};
use futures::pin_mut;
use serde::{Deserialize, Serialize};
use crate::{indexer::{ColumnSpec, Ctx, Indexer, TableSpec}, util::{hash_str, parse_html, systemtime_to_utc}};
use chrono::prelude::*;
use tokio_postgres::{binary_copy::BinaryCopyInWriter, types::Type};
#[derive(Serialize, Deserialize)]
struct Config {
input: String,
output: String
}
#[derive(Clone)]
pub struct RclweIndexer {
config: Arc<Config>
}
fn parse_circache_meta(s: &str) -> Option<HashMap<String, String>> {
let mut out = HashMap::new();
for line in s.lines() {
if !line.is_empty() {
let (fst, snd) = line.split_once(" = ")?;
out.insert(fst.to_string(), snd.to_string());
}
}
Some(out)
}
async fn move_file(from: &PathBuf, to: PathBuf) -> Result<()> {
tokio::fs::copy(from, to).await?;
tokio::fs::remove_file(from).await?;
Ok(())
}
#[async_trait::async_trait]
impl Indexer for RclweIndexer {
fn name(&self) -> &'static str {
"rclwe"
}
fn schemas(&self) -> &'static [&'static str] {
&[r#"
CREATE TABLE webpages (
id BIGSERIAL PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
url TEXT NOT NULL,
title TEXT NOT NULL,
content TEXT NOT NULL,
html TEXT NOT NULL
);
"#]
}
fn tables(&self) -> &'static [TableSpec] {
&[
TableSpec {
name: "webpages",
parent: None,
columns: &[
ColumnSpec {
name: "title",
fts: true,
fts_short: true,
trigram: true,
is_array: false
},
ColumnSpec {
name: "content",
fts: true,
fts_short: false,
trigram: false,
is_array: false
}
],
url_source_column: Some("url"),
title_source_column: "title",
summary_columns: &["timestamp", "url", "title"]
}
]
}
async fn run(&self, ctx: Arc<Ctx>) -> Result<()> {
let conn = ctx.pool.get().await?;
let sink = conn.copy_in("COPY webpages (timestamp, url, title, content, html) FROM STDIN BINARY").await?;
let input = PathBuf::from(&self.config.input);
let mut readdir = tokio::fs::read_dir(&self.config.input).await?;
let output = PathBuf::from(&self.config.output);
let writer = BinaryCopyInWriter::new(sink, &[Type::TIMESTAMPTZ, Type::TEXT, Type::TEXT, Type::TEXT, Type::TEXT]);
pin_mut!(writer);
while let Some(entry) = readdir.next_entry().await? {
if entry.file_type().await?.is_file() {
let path = entry.path();
let name = path.file_name().unwrap().to_str().context("invalid path")?;
// for some reason there are several filename formats in storage
let mut metadata = None;
if name.starts_with("firefox-recoll-web") {
let meta_file = "_".to_owned() + name;
let mtime = systemtime_to_utc(entry.metadata().await?.modified()?)?;
if let Some(url) = tokio::fs::read_to_string(input.join(&meta_file)).await?.lines().next() {
metadata = Some((mtime, url.to_string()));
tokio::fs::rename(input.join(&meta_file), output.join(&meta_file)).await?;
} else {
log::warn!("Metadata error for {}", name);
}
}
if let Some(rem) = name.strip_suffix(".html") {
let meta_file = format!("{}.dic", rem);
let meta = parse_circache_meta(&tokio::fs::read_to_string(input.join(&meta_file)).await?).context("invalid metadata format")?;
let mtime = i64::from_str(meta.get("fmtime").context("invalid metadata format")?)?;
metadata = Some((DateTime::from_timestamp(mtime, 0).context("time is broken")?, meta.get("url").context("invalid metadata format")?.to_string()));
move_file(&input.join(&meta_file), output.join(&meta_file)).await?;
}
if let Some(rem) = name.strip_prefix("recoll-we-c") {
let meta_file = format!("recoll-we-m{}", rem);
let mtime = systemtime_to_utc(entry.metadata().await?.modified()?)?;
if tokio::fs::try_exists(input.join(&meta_file)).await? {
if let Some(url) = tokio::fs::read_to_string(input.join(&meta_file)).await?.lines().next() {
metadata = Some((mtime, url.to_string()));
move_file(&input.join(&meta_file), output.join(&meta_file)).await?;
} else {
log::warn!("Metadata error for {}", name);
}
} else {
log::warn!("No metadata for {}", name);
}
}
if let Some((mtime, url)) = metadata {
let html = tokio::fs::read(&path).await?;
move_file(&path, output.join(name)).await?;
let (content, title) = parse_html(html.as_slice(), true);
writer.as_mut().write(&[&mtime, &url, &title.replace("\0", ""), &content.replace("\0", ""), &String::from_utf8_lossy(&html).replace("\0", "")]).await?;
}
}
}
writer.finish().await?;
Ok(())
}
fn url_for(&self, _table: &str, column_content: &str) -> String {
column_content.to_string()
}
}
impl RclweIndexer {
pub async fn new(config: toml::Table) -> Result<Box<Self>> {
let config: Config = config.try_into()?;
Ok(Box::new(RclweIndexer {
config: Arc::new(config)
}))
}
}

View File

@ -0,0 +1,167 @@
use std::collections::HashSet;
use std::ffi::OsStr;
use std::sync::Arc;
use anyhow::Result;
use compact_str::CompactString;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use crate::util::{hash_str, parse_html, parse_pdf, systemtime_to_utc, urlencode, CONFIG};
use crate::indexer::{Ctx, Indexer, TableSpec, delete_nonexistent_files, ColumnSpec};
use async_walkdir::WalkDir;
use chrono::prelude::*;
use regex::RegexSet;
#[derive(Serialize, Deserialize)]
struct Config {
path: String,
#[serde(default)]
ignore_regexes: Vec<String>,
base_url: String
}
pub struct TextFilesIndexer {
config: Config,
ignore: RegexSet
}
lazy_static::lazy_static! {
static ref VALID_EXTENSIONS: HashSet<&'static str> = ["pdf", "txt", "html", "htm", "xhtml"].into_iter().collect();
}
#[async_trait::async_trait]
impl Indexer for TextFilesIndexer {
fn name(&self) -> &'static str {
"text_files"
}
fn schemas(&self) -> &'static [&'static str] {
&[r#"
CREATE TABLE text_files (
id BIGINT PRIMARY KEY,
path TEXT NOT NULL UNIQUE,
title TEXT,
content TEXT NOT NULL,
timestamp TIMESTAMPTZ
);
"#]
}
fn tables(&self) -> &'static [TableSpec] {
&[
TableSpec {
name: "text_files",
parent: None,
columns: &[
ColumnSpec {
name: "path",
fts: true,
fts_short: true,
trigram: true,
is_array: false
},
ColumnSpec {
name: "title",
fts: true,
fts_short: true,
trigram: true,
is_array: false
},
ColumnSpec {
name: "content",
fts: true,
fts_short: false,
trigram: false,
is_array: false
}
],
url_source_column: Some("path"),
title_source_column: "title",
summary_columns: &["path", "title", "timestamp"]
}
]
}
async fn run(&self, ctx: Arc<Ctx>) -> Result<()> {
let entries = WalkDir::new(&self.config.path); // TODO
let ignore = &self.ignore;
let base_path = &self.config.path;
let existing_files = Arc::new(RwLock::new(HashSet::new()));
entries.map_err(|e| anyhow::Error::from(e)).try_for_each_concurrent(Some(CONFIG.concurrency), |entry|
{
let ctx = ctx.clone();
let existing_files = existing_files.clone();
async move {
let real_path = entry.path();
let path = if let Some(path) = real_path.strip_prefix(base_path)?.to_str() {
path
} else {
return Result::Ok(());
};
let ext = real_path.extension().and_then(OsStr::to_str);
if ignore.is_match(path) || !entry.file_type().await?.is_file() || !VALID_EXTENSIONS.contains(ext.unwrap_or_default()) {
return Ok(());
}
let mut conn = ctx.pool.get().await?;
existing_files.write().await.insert(CompactString::from(path));
let metadata = entry.metadata().await?;
let row = conn.query_opt("SELECT timestamp FROM text_files WHERE id = $1", &[&hash_str(path)]).await?;
let timestamp: DateTime<Utc> = row.map(|r| r.get(0)).unwrap_or(DateTime::<Utc>::MIN_UTC);
let modtime = systemtime_to_utc(metadata.modified()?)?;
if modtime > timestamp {
let parse = match ext {
Some("pdf") => {
parse_pdf(&real_path).await.map(Some)
},
Some("txt") => {
let content = tokio::fs::read(&real_path).await?;
Ok(Some((String::from_utf8_lossy(&content).to_string(), String::new())))
},
Some("htm") | Some("html") | Some("xhtml") => {
let content = tokio::fs::read(&real_path).await?;
Ok(Some(tokio::task::block_in_place(|| parse_html(&content, true))))
},
_ => Ok(None),
};
match parse {
Ok(None) => (),
Ok(Some((content, title))) => {
// Null bytes aren't legal in Postgres strings despite being valid UTF-8.
let tx = conn.transaction().await?;
tx.execute("DELETE FROM text_files WHERE id = $1", &[&hash_str(path)]).await?;
tx.execute("INSERT INTO text_files VALUES ($1, $2, $3, $4, $5)",
&[&hash_str(path), &path, &title.replace("\0", ""), &content.replace("\0", ""), &modtime])
.await?;
tx.commit().await?;
},
Err(e) => log::warn!("File parse for {}: {}", path, e)
}
}
Result::Ok(())
}
}).await?;
{
let existing = existing_files.read().await;
delete_nonexistent_files(ctx, "SELECT path FROM text_files", "DELETE FROM text_files WHERE id = $1", &existing).await?;
}
Ok(())
}
fn url_for(&self, _table: &str, column_content: &str) -> String {
format!("{}{}", self.config.base_url, urlencode(column_content))
}
}
impl TextFilesIndexer {
pub async fn new(config: toml::Table) -> Result<Box<Self>> {
let config: Config = config.try_into()?;
Ok(Box::new(TextFilesIndexer {
ignore: RegexSet::new(&config.ignore_regexes)?,
config
}))
}
}

View File

@ -0,0 +1,242 @@
use std::collections::HashSet;
use std::{collections::HashMap, path::PathBuf};
use std::sync::Arc;
use anyhow::{Context, Result};
use async_walkdir::WalkDir;
use compact_str::ToCompactString;
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;
use crate::indexer::{Ctx, Indexer, TableSpec, ColumnSpec};
use crate::util::{hash_thing, parse_html};
use chrono::prelude::*;
use tokio::{fs::File, io::{AsyncBufReadExt, BufReader}};
use mail_parser::MessageParser;
use std::future::Future;
#[derive(Serialize, Deserialize)]
struct Config {
mboxes_path: String,
account_mapping: HashMap<String, String>,
ignore_mboxes: HashSet<String>
}
#[derive(Clone)]
pub struct EmailIndexer {
config: Arc<Config>
}
#[derive(Debug, Serialize, Deserialize)]
enum Body {
Plain(String),
Html(String)
}
#[derive(Debug, Serialize, Deserialize)]
struct Email {
message_id: String,
reply_to: Option<String>,
date: DateTime<Utc>,
raw: Vec<u8>,
from: String,
from_address: String,
subject: String,
body: Body
}
lazy_static::lazy_static! {
static ref MULTIPART_REGEX: regex::bytes::Regex = regex::bytes::RegexBuilder::new(r#"content-type:\s*multipart/(mixed|alternative);\s*boundary="?([A-Za-z0-9=_-]+)"?;?\r\n$"#).case_insensitive(true).build().unwrap();
}
async fn read_mbox<U: Future<Output=Result<()>>, F: FnMut(Email) -> U>(path: &PathBuf, mut callback: F) -> Result<()> {
let input = File::open(path).await?;
let mut buf = Vec::new();
let parse_current = |buf: &mut Vec<u8>| -> Result<Option<Email>> {
let message = MessageParser::default().parse(buf.as_slice()).context("parse error")?;
if message.date().is_none() || message.from().is_none() { buf.clear(); return Ok(None); }
let email = Email {
message_id: message.header("Message-ID").map(|h| h.as_text().context("parse error")).transpose()?.or_else(|| message.subject()).context("no message ID or subject")?.to_string(),
reply_to: message.header("In-Reply-To").map(|s| s.as_text().context("parse error").map(|s| s.to_string())).transpose()?,
date: DateTime::from_timestamp(message.date().context("missing date")?.to_timestamp(), 0).context("invalid time")?,
raw: buf.clone(),
from: message.from().map(|c| {
let g = c.clone().into_group()[0].clone();
g.name.map(|x| x.to_string()).unwrap_or(g.addresses[0].name.as_ref().map(|x| x.to_string()).unwrap_or(g.addresses[0].address.as_ref().unwrap().to_string()))
}).context("missing from")?,
from_address: message.from().map(|c| {
let g = c.clone().into_group()[0].clone();
g.addresses[0].address.as_ref().unwrap().to_string()
}).context("missing from")?,
subject: message.subject().map(|x| x.to_string()).unwrap_or_default(),
body: message.body_html(0).map(|s| s.to_string()).map(Body::Html)
.or_else(|| message.body_text(0).map(|i| i.to_string()).map(Body::Plain)).context("they will never find the body")?
};
buf.clear();
Ok(Some(email))
};
let mut reader = BufReader::new(input);
let mut line = Vec::new();
let mut current_delim = None;
while let Ok(x) = reader.read_until(0x0A, &mut line).await {
if x == 0 {
break;
}
if line.is_empty() {
break;
}
// sorry.
// Thunderbird doesn't seem to escape "From " in message bodies at all.
// I originally thought I could just detect lines containing only "From ", but it turns out that some of them contain further text!
// I don't know how Thunderbird itself parses these, but my best guess is that it's parsing the mbox and MIME at the same time.
// I don't want to so I'm accursedly partially parsing the MIME.
if let Some(cap) = MULTIPART_REGEX.captures(&buf) {
if let Some(m) = cap.get(2) {
current_delim = Some(m.as_bytes().to_vec());
}
}
if let Some(ref delim) = current_delim {
if line.len() > 6 && &line[2..(line.len()-4)] == delim.as_slice() && line.starts_with(b"--") && line.ends_with(b"--\r\n") {
current_delim = None;
}
}
if current_delim.is_none() && line.starts_with(b"From ") && !buf.is_empty() {
if let Ok(Some(mail)) = parse_current(&mut buf) {
callback(mail).await?;
}
} else {
buf.extend(&line);
}
line.clear();
}
if !buf.is_empty() { if let Ok(Some(mail)) = parse_current(&mut buf) { callback(mail).await? } }
Ok(())
}
#[async_trait::async_trait]
impl Indexer for EmailIndexer {
fn name(&self) -> &'static str {
"email"
}
fn schemas(&self) -> &'static [&'static str] {
&[r#"
CREATE TABLE IF NOT EXISTS emails (
id BIGINT PRIMARY KEY,
message_id TEXT,
reply_to TEXT,
timestamp TIMESTAMPTZ NOT NULL,
raw BYTEA NOT NULL,
account TEXT NOT NULL,
box TEXT NOT NULL,
from_ TEXT,
from_addr TEXT,
subject TEXT NOT NULL,
body TEXT NOT NULL
);
"#]
}
fn tables(&self) -> &'static [TableSpec] {
&[
TableSpec {
name: "emails",
parent: None,
columns: &[
ColumnSpec {
name: "from_",
fts: true,
fts_short: true,
trigram: true,
is_array: false
},
ColumnSpec {
name: "subject",
fts: true,
fts_short: false,
trigram: true,
is_array: false
},
ColumnSpec {
name: "body",
fts: true,
fts_short: false,
trigram: false,
is_array: false
}
],
url_source_column: None,
title_source_column: "subject",
summary_columns: &["account", "box", "subject", "from_", "timestamp"]
}
]
}
async fn run(&self, ctx: Arc<Ctx>) -> Result<()> {
let mut js: tokio::task::JoinSet<Result<()>> = tokio::task::JoinSet::new();
let mut entries = WalkDir::new(&self.config.mboxes_path);
let config = self.config.clone();
while let Some(entry) = entries.try_next().await? {
let path = entry.path();
let mbox = path.file_stem().and_then(|x| x.to_str()).context("invalid path")?.to_compact_string();
let folder = path.parent().unwrap().file_name().unwrap().to_str().unwrap();
if let Some(account) = config.account_mapping.get(folder) {
let account = account.to_compact_string();
let ext = path.extension();
if let None = ext {
if !self.config.ignore_mboxes.contains(mbox.as_str()) {
let ctx = ctx.clone();
js.spawn(async move {
read_mbox(&path, move |mail| {
let ctx = ctx.clone();
let mbox = mbox.trim_end_matches("-1").to_compact_string();
let account = account.clone();
async move {
let conn = ctx.pool.get().await?;
let id = hash_thing(&mail.raw.as_slice());
let body = match mail.body {
Body::Plain(t) => t,
Body::Html(h) => parse_html(h.as_bytes(), false).0
};
conn.execute(r#"INSERT INTO emails VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (id) DO UPDATE SET box = $7"#, &[
&id,
&mail.message_id,
&mail.reply_to,
&mail.date,
&mail.raw,
&account.as_str(),
&mbox.as_str(),
&mail.from,
&mail.from_address,
&mail.subject,
&body
]).await?;
Ok(())
}
}).await
});
}
}
}
}
while let Some(next) = js.join_next().await {
next??;
}
Ok(())
}
fn url_for(&self, _table: &str, _column_content: &str) -> String {
unimplemented!()
}
}
impl EmailIndexer {
pub async fn new(config: toml::Table) -> Result<Box<Self>> {
let config: Config = config.try_into()?;
Ok(Box::new(EmailIndexer {
config: Arc::new(config)
}))
}
}

View File

@ -0,0 +1,862 @@
use chrono::{DateTime, Utc};
use compact_str::{CompactString, ToCompactString};
use deadpool_postgres::{Manager, ManagerConfig, RecyclingMethod, Pool};
use futures::{StreamExt, TryFutureExt};
use indexer::{ColumnSpec, TableSpec};
use pgvector::HalfVector;
use semantic::SemanticCtx;
use tokio::signal::unix::{signal, SignalKind};
use tokio_postgres::{NoTls, Row};
use anyhow::{Context, Result};
use util::{get_column_string, urlencode};
use std::collections::{BTreeMap, HashMap};
use std::{str::FromStr, sync::Arc, fmt::Write};
use ntex::web;
use maud::{html, Markup, Render, DOCTYPE};
use serde::{Deserialize, Serialize};
use rs_abbreviation_number::NumericAbbreviate;
use sea_query_postgres::PostgresBinder;
mod indexer;
mod indexers;
mod util;
mod semantic;
mod error;
use crate::error::{Error, EResult};
use crate::util::CONFIG;
use crate::indexer::Indexer;
fn generate_auxiliary_tables(ix: &Box<dyn Indexer>) -> String {
let mut buf = String::new();
for tbl in ix.tables().iter() {
write!(&mut buf, "
CREATE TABLE {name}_change_tracker (
id BIGINT PRIMARY KEY REFERENCES {name} (id) ON DELETE CASCADE,
timestamp TIMESTAMPTZ NOT NULL
);
CREATE FUNCTION {name}_track() RETURNS trigger AS $$
BEGIN
IF NEW IS DISTINCT FROM OLD THEN
INSERT INTO {name}_change_tracker VALUES (
new.id,
CURRENT_TIMESTAMP
)
ON CONFLICT (id) DO UPDATE SET timestamp = CURRENT_TIMESTAMP;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER {name}_track_insert AFTER INSERT ON {name}
FOR EACH ROW
EXECUTE FUNCTION {name}_track();
CREATE TRIGGER {name}_track_update AFTER UPDATE ON {name}
FOR EACH ROW
EXECUTE FUNCTION {name}_track();
", name=tbl.name).unwrap();
for col in tbl.columns {
if col.fts {
write!(&mut buf, "
CREATE TABLE {name}_{col}_fts_chunks (
document BIGINT NOT NULL REFERENCES {name} (id) ON DELETE CASCADE,
start INTEGER NOT NULL,
length INTEGER NOT NULL,
embedding HALFVEC({dim})
);
", name=tbl.name, col=col.name, dim=CONFIG.semantic.embedding_dim).unwrap();
}
}
}
buf
}
fn page(title: &str, body: Markup) -> web::HttpResponse {
let mut response = web::HttpResponse::Ok();
let body = html! {
(DOCTYPE)
meta charset="utf-8";
link rel="stylesheet" href="/style.css";
title { "Maghammer " (title) }
@if title != "Index" {
a href="/" { "" }
}
h1 { (title) }
main {
(body)
}
}.render().0;
response.content_type("text/html").body(body)
}
fn search_bar(ctx: &ServerState, value: &SearchQuery) -> Markup {
html! {
form.search-bar action="/search" {
input type="search" placeholder="Query" value=(value.q) name="q";
select name="src_mode" {
option selected[value.src_mode == SearchSourceMode::Mix] { "Mix" }
option selected[value.src_mode == SearchSourceMode::Titles] { "Titles" }
option selected[value.src_mode == SearchSourceMode::Content] { "Content" }
}
select name="table" {
option selected[value.table.as_ref().map(|x| x == "All").unwrap_or(true)] { "All" }
@for ix in ctx.indexers.iter() {
@for table in ix.tables() {
option selected[value.table.as_ref().map(|x| x == table.name).unwrap_or(false)] { (table.name) }
}
}
}
input type="submit" value="Search";
}
}
}
#[web::get("/")]
async fn index_page(state: web::types::State<ServerState>) -> impl web::Responder {
let table_names: Vec<&str> = state.indexers.iter()
.flat_map(|i| i.tables())
.map(|t| t.name)
.collect();
let conn = state.pool.get().await?;
let mut counts: HashMap<String, i64> = HashMap::new();
for row in conn.query("SELECT relname, reltuples::BIGINT FROM pg_class WHERE relname = ANY($1)", &[&table_names]).await? {
counts.insert(row.get(0), row.get(1));
}
let body = html! {
(search_bar(&state, &Default::default()))
@for indexer in state.indexers.iter() {
div {
h2.colborder style=(util::hash_to_color("border-color", indexer.name())) { (indexer.name()) }
@for table in indexer.tables() {
div {
a href=(url_for_table(indexer.name(), table.name, None, None)) { (table.name) } " / " (counts[table.name])
}
}
}
}
};
EResult::Ok(page("Index", body))
}
fn with_newlines(s: &str) -> Markup {
html! {
@for line in s.lines() {
(line)
br;
}
}
}
fn url_for_table(indexer: &str, table: &str, record: Option<i64>, controls: Option<TableViewControls>) -> String {
let mut out = format!("/t/{}/{}", indexer, table);
if let Some(record) = record {
write!(out, "/{}", record).unwrap();
}
if let Some(controls) = controls {
write!(out, "?c={}", urlencode(&serde_json::to_string(&controls).unwrap())).unwrap();
}
out
}
fn render_json_value(j: serde_json::Value) -> Markup {
use serde_json::Value::*;
match j {
Number(x) => html! { (x) },
Array(xs) => html! {
ul {
@for x in xs {
li {
(render_json_value(x))
}
}
}
},
String(s) => html! { (s) },
Bool(b) => html! { (b) },
Object(kv) => html! {
ul {
@for (k, v) in kv {
li {
div { (k) }
div { (render_json_value(v)) }
}
}
}
},
Null => html! { "null" }
}
}
fn render_column<F: Fn(&str) -> Markup>(row: &Row, index: usize, table: &TableSpec, indexer: &Box<dyn Indexer>, short_form: bool, generate_filter: F) -> Option<Markup> {
use tokio_postgres::types::Type;
let name = row.columns()[index].name();
let type_ = row.columns()[index].type_().clone();
Some(match type_ {
Type::BOOL => html! { (row.get::<usize, Option<bool>>(index)?) },
Type::TEXT => {
let content = &row.get::<usize, Option<String>>(index)?;
if let Some(usource) = table.url_source_column {
if name == usource {
html! {
(generate_filter(&content))
a href=(indexer.url_for(table.name, content)) {
(with_newlines(&content))
}
}
} else {
html! {
(generate_filter(&content))
(with_newlines(&content))
}
}
} else {
html! {
(generate_filter(&content))
(with_newlines(&content))
}
}
},
Type::TIMESTAMPTZ => {
let text = row.get::<usize, Option<DateTime<Utc>>>(index)?.format("%Y-%m-%d %H:%M:%S").to_compact_string();
html! { (generate_filter(&text)) (text) }
},
Type::TEXT_ARRAY => {
let texts = row.get::<usize, Option<Vec<String>>>(index)?;
let len = texts.len();
html! {
@if short_form {
@for (i, x) in texts.into_iter().enumerate() {
(generate_filter(&x))
(x)
@if i != len - 1 {
", "
}
}
} @else {
ul {
@for x in texts {
li {
(generate_filter(&x))
(x)
}
}
}
}
}
},
Type::INT2 => {
let text = row.get::<usize, Option<i16>>(index)?.to_compact_string();
html! { (generate_filter(&text)) (text) }
},
Type::INT4 => {
let text = row.get::<usize, Option<i32>>(index)?.to_compact_string();
html! { (generate_filter(&text)) (text) }
},
Type::INT8 => {
let text = row.get::<usize, Option<i64>>(index)?.to_compact_string();
html! { (generate_filter(&text)) (text) }
},
Type::FLOAT4 => {
let text = row.get::<usize, Option<f32>>(index)?.abbreviate_number(&Default::default());
html! { (generate_filter(&text)) (text) }
},
Type::FLOAT8 => {
let text = row.get::<usize, Option<f64>>(index)?.abbreviate_number(&Default::default());
html! { (generate_filter(&text)) (text) }
},
Type::JSONB => {
let value = row.get::<usize, Option<serde_json::Value>>(index)?;
html! { (render_json_value(value)) }
},
Type::BYTEA => {
let value: Vec<u8> = row.get::<usize, Option<Vec<u8>>>(index)?;
html! { (with_newlines(&String::from_utf8_lossy(&value))) }
},
_ => html! { span.error { (format!("cannot render type {:?}", type_)) } }
})
}
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
enum Filter {
Contains,
Equal,
Gt,
Lt,
NotNull
}
impl Filter {
const VALUES: &'static [Filter] = &[Filter::Contains, Filter::Equal, Filter::Gt, Filter::Lt, Filter::NotNull];
fn name(&self) -> &'static str {
match self {
Filter::Contains => "Contains",
Filter::Equal => "Equal",
Filter::Gt => "Gt",
Filter::Lt => "Lt",
Filter::NotNull => "NotNull"
}
}
fn from_str(s: &str) -> Option<Self> {
Some(match s {
"Contains" => Filter::Contains,
"Equal" => Filter::Equal,
"Gt" => Filter::Gt,
"Lt" => Filter::Lt,
"NotNull" => Filter::NotNull,
_ => return None
})
}
}
#[derive(Deserialize, Serialize, Debug, Default, Clone)]
struct TableViewControls {
#[serde(default)]
sort: Option<CompactString>,
#[serde(default)]
sort_desc: bool,
#[serde(default)]
offset: usize,
#[serde(default)]
filters: im::Vector<(CompactString, Filter, CompactString)>
}
#[derive(Deserialize, Serialize, Debug, Default, Clone)]
struct TableViewControlsWrapper {
#[serde(default)]
c: String,
index: Option<usize>,
column: Option<String>,
filter: Option<String>,
value: Option<String>,
ctrl: Option<String>
}
const COUNT: usize = 100;
#[web::get("/{indexer}/{table}")]
async fn table_view<'a>(state: web::types::State<ServerState>, path: web::types::Path<(String, String)>, config: web::types::Query<TableViewControlsWrapper>) -> impl web::Responder {
use sea_query::*;
let mut controls: TableViewControls = serde_json::from_str(&config.c).ok().unwrap_or_default();
if let (Some(index), Some(column), Some(filter), Some(value)) = (config.index.as_ref(), config.column.as_ref(), config.filter.as_ref(), config.value.as_ref()) {
if config.ctrl.as_ref().map(|x| x.as_str()).unwrap_or_default() == "Delete" {
controls.filters.remove(*index);
} else {
let record = (column.to_compact_string(), Filter::from_str(filter).context("filter parse error")?, value.to_compact_string());
if *index < controls.filters.len() {
controls.filters[*index] = record;
} else {
controls.filters.push_back(record);
}
}
}
let (indexer, table) = path.into_inner();
let indexer = state.indexer(&indexer)?;
let table = indexer.table(&table).ok_or(Error::NotFound)?;
let conn = state.pool.get().await?;
let offset = &(controls.offset as i64);
let rows = conn.query("SELECT column_name, data_type FROM information_schema.columns WHERE table_name = $1", &[&table.name]).await?;
let mut typemap = BTreeMap::new();
for row in rows.into_iter() {
let column: String = row.get(0);
let mut ty: String = row.get(1);
// TODO: information schema returns ARRAY for all arrays, and we handwave arrays later on in the code
if &ty == "ARRAY" {
ty = String::from("TEXT")
}
typemap.insert(column, ty);
}
let mut query = sea_query::Query::select();
query.from(table);
query.column((table, Alias::new("id")));
for col in table.summary_columns {
query.column((table, Alias::new(*col)));
}
if let Some((this_column, other_table)) = table.parent {
let other = indexer.table(other_table).unwrap();
query.column((other, Alias::new(other.title_source_column)));
query.column((table, Alias::new(this_column)));
query.join(JoinType::InnerJoin, Alias::new(other_table), Expr::col((other, Alias::new("id"))).eq(Expr::col((table, Alias::new(this_column)))));
}
query.limit((COUNT + 1) as u64);
query.offset(*offset as u64);
if let Some(sort) = &controls.sort {
let sort = Alias::new(sort.as_str());
query.order_by(sort, if controls.sort_desc { Order::Desc } else { Order::Asc });
}
for (column, filter_type, value) in controls.filters.iter() {
let column_det = table.column(column);
let column_alias = Alias::new(column.as_str());
let column_expr = if column_det.map(|x| x.is_array).unwrap_or_default() {
Expr::expr(SimpleExpr::FunctionCall(PgFunc::any(Expr::col(column_alias))))
} else {
Expr::col(column_alias)
};
// As a very ugly hack to avoid our ability to deal with dynamic types neatly, force the database to take in strings and internally cast them.
let value_expr = Expr::value(value.to_string()).cast_as(Alias::new("TEXT")).cast_as(Alias::new(typemap[column.as_str()].to_string()));
let expr = match filter_type {
Filter::Contains => column_expr.like(format!("%{}%", value)),
Filter::Equal => value_expr.eq(column_expr),
Filter::Gt => Expr::expr(value_expr).lte(column_expr),
Filter::Lt => Expr::expr(value_expr).gte(column_expr),
Filter::NotNull => column_expr.is_not_null()
};
query.and_where(expr);
}
let query = query.build_postgres(sea_query::PostgresQueryBuilder);
let rows = conn.query(&query.0, &query.1.as_params()).await?;
let control_link = |label: &str, new_controls: TableViewControls| {
let url = url_for_table(indexer.name(), table.name, None, Some(new_controls));
html! {
a href=(url) { (label) }
}
};
let new_control_string = serde_json::to_string(&controls).unwrap();
let body = html! {
div.filters {
@for (i, (column, filter_type, value)) in controls.filters.iter().enumerate() {
div {
form {
select name="column" {
@for ncolumn in typemap.keys() {
option selected[column.as_str() == ncolumn] { (ncolumn) }
}
}
select name="filter" {
@for filter in Filter::VALUES {
option selected[filter == filter_type] { (filter.name()) }
}
}
input type="text" name="value" value=(value);
input type="submit" name="ctrl" value="Set";
input type="submit" name="ctrl" value="Delete";
input type="hidden" name="index" value=(i);
input type="hidden" name="c" value=(&new_control_string);
}
}
}
div {
form {
select name="column" {
@for column in typemap.keys() {
option { (column) }
}
}
select name="filter" {
@for filter in Filter::VALUES {
option { (filter.name()) }
}
}
input type="text" name="value";
input type="submit" value="Add";
input type="hidden" name="index" value=(controls.filters.len());
input type="hidden" name="c" value=(&new_control_string);
}
}
}
table {
tr {
td;
@for col in table.summary_columns.iter() {
td {
(col)
" "
(control_link("", TableViewControls {
sort: Some(col.to_compact_string()),
sort_desc: true,
..controls.clone()
}))
" "
(control_link("", TableViewControls {
sort: Some(col.to_compact_string()),
sort_desc: false,
..controls.clone()
}))
}
}
@if let Some((parent, _other_table)) = table.parent {
td {
(parent)
}
}
}
@for row in &rows[0..COUNT.min(rows.len())] {
@let id: i64 = row.get(0);
tr {
td {
a href=(url_for_table(indexer.name(), table.name, Some(id), None)) { "Full" }
}
@for i in 1..(table.summary_columns.len() + 1) {
td {
@let generate_filter = |s: &str| {
html! {
(control_link("", TableViewControls {
filters: controls.filters.clone() + im::Vector::unit((table.summary_columns[i - 1].to_compact_string(), Filter::Equal, s.to_compact_string())),
..controls.clone()
}))
" "
}
};
(render_column(&row, i, table, indexer, true, generate_filter).unwrap_or(html!{}))
}
}
@if let Some((_parent, other_table)) = table.parent {
td {
@let title: Option<String> = row.get(table.summary_columns.len() + 1);
@let parent_id: Option<i64> = row.get(table.summary_columns.len() + 2);
@if let Some(parent_id) = parent_id {
a href=(url_for_table(indexer.name(), other_table, Some(parent_id), None)) { (title.unwrap_or_default()) }
}
}
}
}
}
}
div.footer {
@if controls.offset > 0 {
(control_link("Prev", TableViewControls {
offset: controls.offset - COUNT,
..controls.clone()
}))
" "
}
@if rows.len() > COUNT {
(control_link("Next", TableViewControls {
offset: controls.offset + COUNT,
..controls.clone()
}))
}
}
};
EResult::Ok(page(&format!("Table {}/{}", indexer.name(), table.name), body))
}
#[web::get("/{indexer}/{table}/{record}")]
async fn record_view(state: web::types::State<ServerState>, path: web::types::Path<(String, String, i64)>) -> impl web::Responder {
let (indexer, table, record) = path.into_inner();
let conn = state.pool.get().await?;
let row = conn.query_one(&format!("SELECT * FROM {} WHERE id = $1", table), &[&record]).await?;
let indexer = state.indexer(&indexer)?;
let table = indexer.table(&table).ok_or(Error::NotFound)?;
let fk_ref = if let Some((this_column, other_table)) = table.parent {
let other_tablespec = indexer.table(other_table).unwrap();
let mut other_id: i64 = 0;
for (i, col) in row.columns().iter().enumerate() {
if col.name() == this_column {
other_id = row.get(i);
}
}
let row = conn.query_one(&format!("SELECT {} FROM {} WHERE id = $1", other_tablespec.title_source_column, other_table), &[&other_id]).await?;
Some((this_column, row.get::<usize, String>(0), other_table))
} else { None };
let body = html! {
@for (i, col) in row.columns()[1..].iter().enumerate() {
div.wide-column {
div.column-name { (col.name()) }
@if let Some(ref fk_ref) = fk_ref {
@let (name, value, tablename) = fk_ref;
@if name == &col.name() {
div.content {
a href=(url_for_table(indexer.name(), tablename, row.get(i + 1), None)) { (value) }
}
} @else {
div.content { (render_column(&row, i + 1, table, indexer, false, |_| html!{}).unwrap_or(html!{})) }
}
} @else {
div.content { (render_column(&row, i + 1, table, indexer, false, |_| html!{}).unwrap_or(html!{})) }
}
}
}
};
EResult::Ok(page(&format!("Record {}/{}/{}", indexer.name(), table.name, record), body))
}
#[derive(Deserialize, PartialEq, Eq, Clone, Copy, Debug)]
enum SearchSourceMode {
Mix,
Titles,
Content
}
impl Default for SearchSourceMode {
fn default() -> Self {
SearchSourceMode::Content
}
}
#[derive(Deserialize, Default, Debug)]
struct SearchQuery {
q: String,
#[serde(default)]
src_mode: SearchSourceMode,
#[serde(default)]
table: Option<String>
}
struct SearchResult {
docid: i64,
indexer: &'static str,
table: &'static str,
column: &'static str,
title: Option<String>,
url: Option<String>,
match_quality: i32,
text: String
}
async fn query_one_table(table: &TableSpec, indexer: &'static str, col: &ColumnSpec, state: ServerState, count: usize, embedding_choice: Arc<HalfVector>) -> Result<Vec<SearchResult>> {
let conn = state.pool.get().await?;
let rows = conn.query(&format!("SELECT document, start, length, embedding <#> $1 AS match FROM {}_{}_fts_chunks ORDER BY match LIMIT {}", table.name, col.name, count as i64), &[&*embedding_choice]).await?;
let mut chunks = HashMap::new();
for row in rows {
let docid: i64 = row.get(0);
let offset: i32 = row.get(1);
let length: i32 = row.get(2);
let matchq: f64 = row.get(3);
chunks.entry(docid).or_insert_with(Vec::new).push((offset, length, matchq));
}
let docids: Vec<i64> = chunks.keys().copied().collect();
let mut select_one_query = format!("SELECT id, {}, {} ", col.name, table.title_source_column);
if let Some(usource) = table.url_source_column {
write!(&mut select_one_query, ", {} ", usource).unwrap();
}
write!(&mut select_one_query, "FROM {} WHERE id = ANY($1)", table.name).unwrap();
let rows = conn.query(&select_one_query, &[&docids]).await?;
let mut results = Vec::with_capacity(count);
for row in rows {
let doc: i64 = row.get(0);
let text: String = get_column_string(&row, 1, col).context("missing document")?;
let title: Option<String> = row.get(2);
let usource: Option<String> = table.url_source_column.map(|_| row.get(3));
for (offset, length, matchq) in chunks[&doc].iter() {
// This is slightly deranged, but floats aren't Ord while ints are.
// We know that -1 <= matchq <= 1 so just convert to integers (slightly lossy but negligible).
let matchq_int = (*matchq * (i32::MAX as f64)) as i32;
results.push(SearchResult {
docid: doc,
indexer: indexer,
table: table.name,
column: col.name,
title: title.clone(),
url: usource.clone(),
match_quality: matchq_int,
text: text[*offset as usize..(offset + length) as usize].to_string()
})
}
}
Ok(results)
}
#[web::get("/search")]
async fn fts_page(state: web::types::State<ServerState>, query: web::types::Query<SearchQuery>) -> impl web::Responder {
let state = (*state).clone();
let (prefixed, unprefixed) = semantic::embed_query(&query.q, state.semantic.clone()).await?;
let prefixed = Arc::new(prefixed);
let unprefixed = Arc::new(unprefixed);
let mut results = HashMap::new();
let mut set = tokio::task::JoinSet::new();
for ix in state.indexers.iter() {
for table in ix.tables() {
for col in table.columns {
if !col.fts { continue }
if query.table.as_ref().map(|x| x != table.name && x != "All").unwrap_or(false) { continue }
match (query.src_mode, col.fts_short) {
(SearchSourceMode::Content, true) => continue,
(SearchSourceMode::Titles, false) => continue,
_ => ()
}
// We get a batch of a prefixed and unprefixed query.
// The prefixed one is better for asymmetric search, where the passages are longer than the queries.
// Some columns are not like this (their spec says so) so we use the model for symmetric search.
// This does result in a different distribution of dot products.
let (embedding_choice, count) = if col.fts_short {
(unprefixed.clone(), if query.src_mode == SearchSourceMode::Mix { 1 } else { 20 })
} else {
(prefixed.clone(), 20)
};
set.spawn(query_one_table(table, ix.name(), col, state.clone(), count, embedding_choice));
}
}
}
while let Some(res) = set.join_next().await {
let res = res.context("internal error")??;
for r in res {
let entry = results.entry((r.docid, r.indexer, r.table, r.column, r.title, r.url)).or_insert_with(|| (BTreeMap::new(), i32::MIN));
entry.1 = entry.1.max(r.match_quality);
// don't flip sign: PGVector already returns inner products negated
entry.0.insert(r.match_quality, r.text);
}
}
let mut results: Vec<_> = results.into_iter().collect();
results.sort_by_key(|x| x.1.1);
let body = html! {
(search_bar(&state, &query))
div {
@for ((docid, indexer, table, column, title, maybe_url), (chunks, _match_quality)) in results {
div.search-result {
div.colborder.result-header style=(util::hash_to_color("border-color", indexer)) {
a href=(url_for_table(indexer, table, None, None)) {
(table) "/" (column)
}
" / "
a href=(url_for_table(indexer, table, Some(docid), None)) {
@match title.as_ref().map(String::as_str) {
Some("") => "Record",
Some(x) => (x),
None => "Record"
}
}
@if let Some(url_base) = maybe_url {
" / "
a href=(state.indexer(indexer)?.url_for(table, url_base.as_str())) {
(url_base)
}
}
}
@let len = chunks.len();
@let mut chunks = chunks.into_iter();
div.result-content {
(with_newlines(&chunks.next().unwrap().1))
}
@if len > 1 {
details {
summary { "Expand" }
@for chunk in chunks {
div.result-content {
(with_newlines(&chunk.1))
}
}
}
}
}
}
}
};
EResult::Ok(page("Search", body))
}
#[derive(Clone)]
struct ServerState {
indexers: Arc<Vec<Box<dyn Indexer>>>,
pool: Pool,
semantic: Arc<SemanticCtx>
}
impl ServerState {
fn indexer(&self, name: &str) -> EResult<&Box<dyn Indexer>> {
self.indexers.iter().find(|x| x.name() == name).ok_or(Error::NotFound)
}
}
#[tokio::main]
async fn main() -> Result<()> {
pretty_env_logger::init();
let pg_config = tokio_postgres::Config::from_str(&CONFIG.database)?;
let mgr_config = ManagerConfig { recycling_method: RecyclingMethod::Fast };
let mgr = Manager::from_config(pg_config, NoTls, mgr_config);
let pool = Pool::builder(mgr).max_size(20).build()?;
let indexers: Arc<Vec<Box<dyn Indexer>>> = Arc::new(vec![
indexers::firefox_history_dump::FirefoxHistoryDumpIndexer::new(CONFIG.indexers["browser_history"].clone()).await?,
indexers::rclwe::RclweIndexer::new(CONFIG.indexers["rclwe"].clone()).await?,
indexers::atuin::AtuinIndexer::new(CONFIG.indexers["atuin"].clone()).await?,
indexers::miniflux::MinifluxIndexer::new(CONFIG.indexers["miniflux"].clone()).await?,
indexers::thunderbird_email::EmailIndexer::new(CONFIG.indexers["email"].clone()).await?,
indexers::mediafiles::MediaFilesIndexer::new(CONFIG.indexers["media_files"].clone()).await?,
indexers::books::BooksIndexer::new(CONFIG.indexers["books"].clone()).await?,
indexers::textfiles::TextFilesIndexer::new(CONFIG.indexers["text_files"].clone()).await?,
indexers::anki::AnkiIndexer::new(CONFIG.indexers["anki"].clone()).await?,
indexers::minoteaur::MinoteaurIndexer::new(CONFIG.indexers["minoteaur"].clone()).await?
]);
{
let mut conn = pool.get().await?;
conn.execute("CREATE TABLE IF NOT EXISTS versions (
indexer VARCHAR(256) PRIMARY KEY,
version INTEGER NOT NULL
)", &[]).await?;
for indexer in indexers.iter() {
let name = indexer.name();
let version = conn
.query_opt("SELECT version FROM versions WHERE indexer = $1", &[&name])
.await?.map(|row| row.get(0)).unwrap_or(0);
for (index, sql) in indexer.schemas().iter().enumerate() {
let index = index as i32;
if index >= version {
log::info!("Migrating {} to {}.", name, index);
let tx = conn.transaction().await?;
tx.batch_execute(*sql).await.context("execute migration")?;
tx.execute("INSERT INTO versions VALUES ($1, $2) ON CONFLICT (indexer) DO UPDATE SET version = $2", &[&name, &(index + 1)]).await.context("update migrations database")?;
if index == 0 {
tx.batch_execute(&generate_auxiliary_tables(indexer)).await.context("write auxiliary tables")?;
}
tx.commit().await?;
}
}
}
}
match std::env::args().nth(1).unwrap().as_str() {
"index" => {
let sctx = Arc::new(SemanticCtx::new(pool.clone())?);
for indexer in indexers.iter() {
let ctx = Arc::new(indexer::Ctx {
pool: pool.clone()
});
log::info!("Indexing: {}.", indexer.name());
indexer.run(ctx).await.context(indexer.name())?;
log::info!("Building FTS index: {}.", indexer.name());
semantic::fts_for_indexer(indexer, sctx.clone()).await?;
log::info!("Done: {}.", indexer.name())
}
Ok(())
},
"serve" => {
let state = ServerState {
indexers: indexers.clone(),
pool: pool.clone(),
semantic: Arc::new(SemanticCtx::new(pool.clone())?)
};
tokio::task::block_in_place(|| {
let sys = ntex::rt::System::new("httpserver");
sys.run(|| {
web::HttpServer::new(move || {
web::App::new()
.state(state.clone())
.service(index_page)
.service(web::scope("/t").service(table_view).service(record_view))
.service(fts_page)
.service(ntex_files::Files::new("/", "./static"))
})
.bind(("127.0.0.1", 7403))?
.stop_runtime()
.run();
Ok(())
}).context("init fail")
})
}
_ => Ok(())
}
}

View File

@ -0,0 +1,231 @@
use std::{collections::HashMap, sync::Arc};
use futures::{StreamExt, TryStreamExt};
use pgvector::HalfVector;
use tokenizers::{tokenizer::{Error, Tokenizer}, Encoding};
use anyhow::{Result, anyhow};
use serde::{Deserialize, Serialize};
use reqwest::{header::CONTENT_TYPE, Client};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::ReceiverStream;
use std::fmt::Write;
use half::f16;
use crate::{indexer::{ColumnSpec, Indexer, TableSpec}, util::{get_column_string, CONFIG}};
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct SemanticSearchConfig {
tokenizer: String,
backend: String,
pub embedding_dim: u32,
max_tokens: usize,
batch_size: usize
}
fn convert_tokenizer_error(e: Error) -> anyhow::Error {
anyhow!("tokenizer: {}", e)
}
pub fn load_tokenizer() -> Result<Tokenizer> {
let tokenizer = Tokenizer::from_pretrained(&CONFIG.semantic.tokenizer, None)
.map_err(convert_tokenizer_error)?;
Ok(tokenizer)
}
pub fn tokenize_chunk_text(t: &Tokenizer, s: &str) -> Result<Vec<(usize, usize, String)>> {
let enc = t.encode(s, false).map_err(convert_tokenizer_error)?;
let mut result = vec![];
let mut write = |enc: &Encoding| -> Result<()> {
if !enc.get_offsets().is_empty() {
let offsets: Vec<(usize, usize)> = enc.get_offsets().into_iter().copied().filter(|(a, b)| *a != 0 || *b != 0).collect();
result.push((
offsets[0].0,
offsets[offsets.len() - 1].1,
t.decode(enc.get_ids(), true).map_err(convert_tokenizer_error)?
));
}
Ok(())
};
write(&enc)?;
for overflowing in enc.get_overflowing() {
write(overflowing)?;
}
Ok(result)
}
#[derive(Serialize)]
struct EmbeddingRequest<'a> {
text: Vec<&'a str>
}
#[derive(Deserialize)]
struct EmbeddingResponse(Vec<serde_bytes::ByteBuf>);
fn decode_fp16_buffer(buf: &[u8]) -> Vec<f16> {
buf.chunks_exact(2)
.map(|chunk| half::f16::from_le_bytes([chunk[0], chunk[1]]))
.collect()
}
async fn send_batch(client: &Client, batch: Vec<&str>) -> Result<Vec<Vec<f16>>> {
let res = client.post(&CONFIG.semantic.backend)
.body(rmp_serde::to_vec_named(&EmbeddingRequest { text: batch })?)
.header(CONTENT_TYPE, "application/msgpack")
.send()
.await?;
let data: EmbeddingResponse = rmp_serde::from_read(&*res.bytes().await?)?;
Ok(data.0.into_iter().map(|x| decode_fp16_buffer(&*x)).collect())
}
struct Chunk {
id: i64,
col: &'static str,
start: i32,
length: i32,
text: String
}
pub struct SemanticCtx {
tokenizer: Tokenizer,
client: reqwest::Client,
pool: deadpool_postgres::Pool
}
impl SemanticCtx {
pub fn new(pool: deadpool_postgres::Pool) -> Result<Self> {
Ok(SemanticCtx { tokenizer: load_tokenizer()?, client: Client::new(), pool })
}
}
// This is only called when we have all chunks for a document ready, so we delete the change record
// and all associated FTS chunks.
async fn insert_fts_chunks(id: i64, chunks: Vec<(Chunk, Vec<f16>)>, table: &TableSpec, ctx: Arc<SemanticCtx>) -> Result<()> {
let mut conn = ctx.pool.get().await?;
let tx = conn.transaction().await?;
tx.execute(&format!("DELETE FROM {}_change_tracker WHERE id = $1", table.name), &[&id]).await?;
for col in table.columns {
if !col.fts { continue }
tx.execute(&format!("DELETE FROM {}_{}_fts_chunks WHERE document = $1", table.name, col.name), &[&id]).await?;
}
for (chunk, embedding) in chunks {
tx.execute(&format!("INSERT INTO {}_{}_fts_chunks VALUES ($1, $2, $3, $4)", table.name, chunk.col), &[
&id,
&chunk.start,
&chunk.length,
&pgvector::HalfVector::from(embedding)
]).await?;
}
tx.commit().await?;
Ok(())
}
pub async fn embed_query(q: &str, ctx: Arc<SemanticCtx>) -> Result<(HalfVector, HalfVector)> {
let prefixed = format!("Represent this sentence for searching relevant passages: {}", q);
let mut result = send_batch(&ctx.client, vec![
&prefixed,
q
]).await?.into_iter();
Ok((HalfVector::from(result.next().unwrap()), HalfVector::from(result.next().unwrap())))
}
pub async fn fts_for_indexer(i: &Box<dyn Indexer>, ctx: Arc<SemanticCtx>) -> Result<()> {
let conn = ctx.pool.get().await?;
for table in i.tables() {
let fts_columns: Arc<Vec<&ColumnSpec>> = Arc::new(table.columns.iter().filter(|x| x.fts).collect());
if !fts_columns.is_empty() {
let mut select_one_query = format!("SELECT id");
for column in fts_columns.iter() {
write!(&mut select_one_query, ", {}", column.name).unwrap();
}
write!(&mut select_one_query, " FROM {} WHERE id = $1", table.name).unwrap();
let select_one_query = Arc::new(select_one_query);
let (encoded_row_tx, encoded_row_rx) = mpsc::channel::<Chunk>(CONFIG.semantic.batch_size * 4);
let pending = Arc::new(RwLock::new(HashMap::new()));
let ctx_ = ctx.clone();
let pending_ = pending.clone();
let get_inputs = conn.query_raw(&format!("SELECT id FROM {}_change_tracker", table.name), [""; 0])
.await?
.map_err(anyhow::Error::from)
.try_for_each_concurrent(CONFIG.concurrency * 2, move |row| {
let ctx = ctx_.clone();
let pending = pending_.clone();
let select_one_query = select_one_query.clone();
let encoded_row_tx = encoded_row_tx.clone();
let fts_columns = fts_columns.clone();
async move {
let conn = ctx.pool.get().await?;
let id: i64 = row.get(0);
let row = conn.query_opt(&*select_one_query, &[&id]).await?;
if let Some(row) = row {
let mut buffer = vec![];
for (i, col) in fts_columns.iter().enumerate() {
let s: Option<String> = get_column_string(&row, i + 1, col);
if let Some(s) = s {
let chunks = tokio::task::block_in_place(|| tokenize_chunk_text(&ctx.tokenizer, &s))?;
for chunk in chunks {
buffer.push(Chunk {
id,
col: col.name,
start: chunk.0 as i32,
length: (chunk.1 - chunk.0) as i32,
text: chunk.2
});
}
}
}
pending.write().await.insert(id, (vec![], buffer.len()));
for chunk in buffer {
encoded_row_tx.send(chunk).await?;
}
}
Ok(())
}
});
let get_inputs = tokio::task::spawn(get_inputs);
let ctx_ = ctx.clone();
let pending = pending.clone();
let make_embeddings =
ReceiverStream::new(encoded_row_rx).chunks(CONFIG.semantic.batch_size)
.map(Ok)
.try_for_each_concurrent(3, move |batch| {
let ctx = ctx_.clone();
let pending = pending.clone();
async move {
let embs = send_batch(&ctx.client, batch.iter().map(|c| c.text.as_str()).collect()).await?;
let mut pending = pending.write().await;
for (embedding, chunk) in embs.into_iter().zip(batch.into_iter()) {
let record = pending.get_mut(&chunk.id).unwrap();
record.1 -= 1;
let id = chunk.id;
record.0.push((chunk, embedding));
// No pending chunks to embed for record: insert into database.
if record.1 == 0 {
let record = pending.remove(&id).unwrap();
// This should generally not be rate-limiting, so don't bother with backpressure.
tokio::task::spawn(insert_fts_chunks(id, record.0, table, ctx.clone()));
}
}
Result::Ok(())
}
});
let make_embeddings: tokio::task::JoinHandle<Result<()>> = tokio::task::spawn(make_embeddings);
get_inputs.await??;
make_embeddings.await??;
}
}
Ok(())
}
#[test]
fn test_tokenize() {
println!("{:?}", tokenize_chunk_text(&load_tokenizer().unwrap(), "test input"));
}

View File

@ -0,0 +1,178 @@
use std::{collections::{HashMap, HashSet}, hash::{Hash, Hasher}, path::PathBuf, time::{SystemTime, UNIX_EPOCH}};
use anyhow::{anyhow, Context, Result};
use chrono::{DateTime, Utc, NaiveDate, NaiveTime};
use regex::Regex;
use seahash::SeaHasher;
use serde::{Serialize, Deserialize};
use tokio_postgres::Row;
use percent_encoding::{utf8_percent_encode, AsciiSet, CONTROLS};
const FRAGMENT: &AsciiSet = &CONTROLS.add(b' ').add(b'"').add(b'<').add(b'>').add(b'`');
use crate::indexer::ColumnSpec;
#[derive(Serialize, Deserialize, Debug)]
pub struct Config {
pub database: String,
pub indexers: HashMap<String, toml::Table>,
#[serde(default="num_cpus::get")]
pub concurrency: usize,
pub semantic: crate::semantic::SemanticSearchConfig
}
fn load_config() -> Result<Config> {
let data = String::from_utf8(std::fs::read("./config.toml")?)?;
toml::from_str(&data).context("config parse failed")
}
lazy_static::lazy_static! {
pub static ref CONFIG: Config = load_config().unwrap();
}
pub fn hash_thing<H: Hash>(s: &H) -> i64 {
let mut h = SeaHasher::new();
s.hash(&mut h);
i64::from_ne_bytes(h.finish().to_ne_bytes())
}
pub fn hash_str(s: &str) -> i64 {
hash_thing(&s)
}
pub fn hash_to_color(property: &str, s: &str) -> String {
format!("{}: hsl({}deg, 100%, 40%)", property, hash_str(s) % 360)
}
pub fn systemtime_to_utc(s: SystemTime) -> Result<DateTime<Utc>> {
let duration = s.duration_since(UNIX_EPOCH)?.as_micros();
Ok(DateTime::from_timestamp_micros(duration as i64).context("invalid time")?)
}
lazy_static::lazy_static! {
static ref HEADER_TAGS: HashSet<&'static [u8]> = ["h1", "h2", "h3", "h4", "h5", "h6"].into_iter().map(str::as_bytes).collect();
static ref INLINE_TAGS: HashSet<&'static [u8]> = ["span", "sub", "sup", "small", "i", "b", "em", "strong", "strike", "d", "a", "link", "head", "font"].into_iter().map(str::as_bytes).collect();
static ref IGNORE_TAGS: HashSet<&'static [u8]> = ["style", "script", "nav", "iframe", "svg"].into_iter().map(str::as_bytes).collect();
static ref SELF_CLOSING_NEWLINE_TAGS: HashSet<&'static [u8]> = ["hr", "br"].into_iter().map(str::as_bytes).collect();
static ref NEWLINES: Regex = Regex::new(r"\n{3,}").unwrap();
static ref SPACE_ON_NEWLINES: Regex = Regex::new(r"\n\s+").unwrap();
}
pub fn parse_html(html: &[u8], prefer_title_tag: bool) -> (String, String) {
use html5gum::Token;
let mut text = String::new();
let mut title_from_header = String::new();
let mut title_from_tag = String::new();
let mut ignore_stack = vec![];
let mut current_header: Option<Vec<u8>> = None;
let mut in_title = false;
for token in html5gum::Tokenizer::new(html).infallible() {
match token {
Token::StartTag(tag) => {
let name = tag.name.0.as_slice();
if IGNORE_TAGS.contains(name) {
ignore_stack.push(tag.name.0);
continue;
}
if HEADER_TAGS.contains(name) {
current_header = Some(name.to_vec());
}
if name == b"title" || name == b"dc:title" {
in_title = true;
}
if SELF_CLOSING_NEWLINE_TAGS.contains(name) && !text.ends_with("\n\n") {
text.push('\n');
}
},
Token::String(s) => {
let s = String::from_utf8_lossy(&s);
let s = s.trim_matches('\n');
if ignore_stack.is_empty() {
if !in_title {
text.push_str(s);
}
if in_title {
title_from_tag.push_str(s);
}
if current_header.is_some() {
title_from_header.push_str(s);
}
}
},
Token::EndTag(tag) => {
let name = tag.name.0;
if let Some(ref header) = current_header {
if header == &name {
current_header = None;
}
}
if ignore_stack.is_empty() && !INLINE_TAGS.contains(name.as_slice()) {
if !text.ends_with("\n\n") {
text.push('\n');
}
}
if let Some(last) = ignore_stack.last() {
if last == &name {
ignore_stack.pop();
}
}
if name.as_slice() == b"title" || name == b"dc:title" {
in_title = false;
}
},
_ => ()
}
}
let title = if prefer_title_tag {
if !title_from_tag.is_empty() { title_from_tag } else { title_from_header }
} else {
if !title_from_header.is_empty() { title_from_header } else { title_from_tag }
};
(NEWLINES.replace_all(&SPACE_ON_NEWLINES.replace_all(&text, "\n"), "\n\n").trim().to_string(), title)
}
pub async fn parse_pdf(path: &PathBuf) -> Result<(String, String)> {
// Rust does not seem to have a robust library for this.
let res = tokio::process::Command::new("pdftotext")
.arg("-htmlmeta")
.arg(path)
.arg("/dev/stdout")
.output().await?;
if !res.status.success() {
return Err(anyhow!("PDF parsing: {}", String::from_utf8_lossy(&res.stderr)));
}
Ok(parse_html(&res.stdout, true))
}
// Wrong but probably-good-enough parsing of dates.
fn parse_naive_date(s: &str) -> Result<DateTime<Utc>> {
let date = NaiveDate::parse_from_str(s, "%Y-%m-%d").or_else(|_| NaiveDate::parse_from_str(s, "%Y%m%d"))?;
let datetime = date.and_time(NaiveTime::from_hms_opt(0, 0, 0).unwrap());
Ok(DateTime::from_naive_utc_and_offset(datetime, Utc))
}
pub fn parse_date(s: &str) -> Result<DateTime<Utc>> {
DateTime::parse_from_rfc3339(s).map(|x| x.into()).or_else(|_| parse_naive_date(s))
}
pub fn get_column_string(row: &Row, index: usize, spec: &ColumnSpec) -> Option<String> {
if spec.is_array {
let mut out = String::new();
let xs: Option<Vec<String>> = row.get(index);
for x in xs? {
out.push_str(&x);
out.push('\n');
}
Some(out)
} else {
row.get(index)
}
}
pub fn urlencode(s: &str) -> String {
utf8_percent_encode(s, FRAGMENT).to_string()
}

View File

@ -0,0 +1,79 @@
* {
box-sizing: border-box;
}
body {
}
.colborder {
border-left: 10px solid;
padding-left: 10px;
}
h1, h2, h3, h4, h5, h6 {
font-weight: 500;
}
input, select {
border-radius: 0;
border: 1px solid gray;
margin-left: -1px;
margin-bottom: -1px;
padding: 0.3em;
}
input[type=search] {
width: 50%;
}
.search-bar, .search-bar input, .search-bar select {
font-size: 1.1em;
}
.search-result {
margin-top: 0.5em;
margin-bottom: 0.5em;
}
.result-content {
margin-top: 0.5em;
padding: 0.5em;
border: 1px solid lightgray;
font-size: 0.9em;
}
.wide-column {
padding: 0.2em;
margin: 0.2em;
}
.wide-column .column-name {
padding: 0.2em;
margin: 0.2em;
}
.wide-column .content {
border: 1px solid lightgray;
padding: 0.2em;
margin: 0.2em;
}
ul {
list-style-type: square;
padding-left: 1em;
}
table {
border-collapse: collapse;
}
td {
padding-left: 0.3em;
padding-right: 0.3em;
border: 1px solid lightgray;
}
.filters {
margin-bottom: 1em;
}