mirror of
https://github.com/osmarks/meme-search-engine.git
synced 2025-01-21 14:36:58 +00:00
fix resumption, oops
This commit is contained in:
parent
b9bb629e6f
commit
c277b49dc1
@ -32,6 +32,7 @@ fn main() -> Result<()> {
|
|||||||
let stream = zstd::stream::Decoder::new(fs::File::open(path)?)?;
|
let stream = zstd::stream::Decoder::new(fs::File::open(path)?)?;
|
||||||
let mut stream = BufReader::new(stream);
|
let mut stream = BufReader::new(stream);
|
||||||
let mut latest_timestamp = 0;
|
let mut latest_timestamp = 0;
|
||||||
|
let mut earliest_timestamp = u64::MAX;
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
loop {
|
loop {
|
||||||
let res: Result<ProcessedEntry, DecodeError> = rmp_serde::from_read(&mut stream);
|
let res: Result<ProcessedEntry, DecodeError> = rmp_serde::from_read(&mut stream);
|
||||||
@ -41,14 +42,15 @@ fn main() -> Result<()> {
|
|||||||
match res {
|
match res {
|
||||||
Ok(x) => {
|
Ok(x) => {
|
||||||
if x.timestamp > latest_timestamp {
|
if x.timestamp > latest_timestamp {
|
||||||
println!("{} {} https://reddit.com/r/{}/comments/{} {} https://mse.osmarks.net/?e={}", x.timestamp, count, x.subreddit, x.id, x.metadata.final_url, URL_SAFE.encode(x.embedding));
|
//println!("{} {} https://reddit.com/r/{}/comments/{} {} https://mse.osmarks.net/?e={}", x.timestamp, count, x.subreddit, x.id, x.metadata.final_url, URL_SAFE.encode(x.embedding));
|
||||||
latest_timestamp = x.timestamp;
|
latest_timestamp = x.timestamp;
|
||||||
}
|
}
|
||||||
|
earliest_timestamp = earliest_timestamp.min(x.timestamp);
|
||||||
},
|
},
|
||||||
Err(DecodeError::InvalidDataRead(x)) | Err(DecodeError::InvalidMarkerRead(x)) if x.kind() == std::io::ErrorKind::UnexpectedEof => break,
|
Err(DecodeError::InvalidDataRead(x)) | Err(DecodeError::InvalidMarkerRead(x)) if x.kind() == std::io::ErrorKind::UnexpectedEof => break,
|
||||||
Err(e) => return Err(e).context("decode fail")
|
Err(e) => return Err(e).context("decode fail")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
println!("{} {}", latest_timestamp, count);
|
println!("{} {} {}", earliest_timestamp, latest_timestamp, count);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
use anyhow::{anyhow, Context, Result};
|
use anyhow::{anyhow, Context, Result};
|
||||||
use common::resize_for_embed;
|
use common::resize_for_embed;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use std::{collections::HashSet, fs, io::{BufReader, Cursor, BufRead, BufWriter}, time::Duration, sync::Arc, str::FromStr, path::PathBuf};
|
use std::{collections::HashSet, ffi::OsStr, fs::{self, read_dir}, io::{BufRead, BufReader, BufWriter, Cursor}, path::PathBuf, str::FromStr, sync::Arc, time::Duration};
|
||||||
use serde::{Serialize, Deserialize};
|
use serde::{Serialize, Deserialize};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use regex::{bytes, Regex, RegexSet, RegexSetBuilder};
|
use regex::{bytes, Regex, RegexSet, RegexSetBuilder};
|
||||||
@ -261,8 +261,8 @@ async fn fetch_file(client: reqwest::Client, config: Arc<Config>, url: &str) ->
|
|||||||
Ok((buffer, content_type, response.url().to_string()))
|
Ok((buffer, content_type, response.url().to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_output(config: Arc<Config>, mut rx: Receiver<ProcessedEntry>) -> Result<()> {
|
fn write_output(config: Arc<Config>, mut rx: Receiver<ProcessedEntry>, seqnum: usize) -> Result<()> {
|
||||||
let mut out = fs::File::options().create(true).append(true).open(&config.output)?;
|
let mut out = fs::File::create(PathBuf::from(&config.output).join(format!("{}.dump-zst", seqnum)))?;
|
||||||
let stream = zstd::Encoder::new(&mut out, 15)?.auto_finish();
|
let stream = zstd::Encoder::new(&mut out, 15)?.auto_finish();
|
||||||
let mut buf_stream = BufWriter::new(stream);
|
let mut buf_stream = BufWriter::new(stream);
|
||||||
while let Some(x) = rx.blocking_recv() {
|
while let Some(x) = rx.blocking_recv() {
|
||||||
@ -279,9 +279,22 @@ enum OperatingMode {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[instrument]
|
#[instrument]
|
||||||
fn readback_output(path: &str) -> Result<(u64, usize)> {
|
fn readback_output(path: &str) -> Result<(u64, usize, usize)> {
|
||||||
use rmp_serde::decode::Error;
|
use rmp_serde::decode::Error;
|
||||||
let stream = zstd::stream::Decoder::new(fs::File::open(path)?)?;
|
|
||||||
|
let mut highest_seqnum: Option<usize> = None;
|
||||||
|
for entry in read_dir(path)? {
|
||||||
|
let entry = entry?;
|
||||||
|
let path = entry.path();
|
||||||
|
if path.extension().and_then(OsStr::to_str).map(|x| x == "dump-zst").unwrap_or(false) {
|
||||||
|
let seqnum = path.file_stem().context("invalid file structure")?.to_str().context("non-UTF8 path")?.parse::<usize>().context("invalid file name")?;
|
||||||
|
highest_seqnum = Some(highest_seqnum.map(|x| x.max(seqnum)).unwrap_or(seqnum));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let seqnum = highest_seqnum.context("no files found")?;
|
||||||
|
|
||||||
|
let stream = zstd::stream::Decoder::new(fs::File::open(PathBuf::from(path).join(&format!("{}.dump-zst", seqnum)))?)?;
|
||||||
let mut stream = BufReader::new(stream);
|
let mut stream = BufReader::new(stream);
|
||||||
let mut latest_timestamp = 0;
|
let mut latest_timestamp = 0;
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
@ -296,7 +309,7 @@ fn readback_output(path: &str) -> Result<(u64, usize)> {
|
|||||||
Err(e) => return Err(e).context("decode fail")
|
Err(e) => return Err(e).context("decode fail")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok((latest_timestamp, count))
|
Ok((latest_timestamp, count, seqnum))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn serve_metrics(config: Arc<Config>) -> Result<()> {
|
async fn serve_metrics(config: Arc<Config>) -> Result<()> {
|
||||||
@ -323,10 +336,10 @@ async fn main() -> Result<()> {
|
|||||||
let config = Arc::new(Config {
|
let config = Arc::new(Config {
|
||||||
max_content_length: 1<<24,
|
max_content_length: 1<<24,
|
||||||
input: String::from("./reddit_subs_202212/"),
|
input: String::from("./reddit_subs_202212/"),
|
||||||
output: String::from("./sample.zst"),
|
output: String::from("."),
|
||||||
backend: String::from("http://localhost:1708"),
|
backend: String::from("http://localhost:1708"),
|
||||||
mode: OperatingMode::FullRun,
|
mode: OperatingMode::FullRun,
|
||||||
filename_threshold: None,
|
filename_threshold: Some(String::from("RS_2017-08.zst")),
|
||||||
metrics_addr: String::from("0.0.0.0:9914"),
|
metrics_addr: String::from("0.0.0.0:9914"),
|
||||||
contact_info: String::from("scraping-ops@osmarks.net")
|
contact_info: String::from("scraping-ops@osmarks.net")
|
||||||
});
|
});
|
||||||
@ -346,9 +359,10 @@ async fn main() -> Result<()> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let mut seqnum = 0;
|
||||||
if let Some((threshold, count)) = timestamp_threshold {
|
if let Some((threshold, count, existing_seqnum)) = timestamp_threshold {
|
||||||
tracing::info!("threshold is {}, {} items", threshold, count);
|
tracing::info!("threshold is {}, {} items, seq {}", threshold, count, existing_seqnum);
|
||||||
|
seqnum = existing_seqnum + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
let backend = get_backend_config(&config.backend).await;
|
let backend = get_backend_config(&config.backend).await;
|
||||||
@ -487,7 +501,7 @@ async fn main() -> Result<()> {
|
|||||||
|
|
||||||
let config_ = config.clone();
|
let config_ = config.clone();
|
||||||
let output_writer_task = match config.mode {
|
let output_writer_task = match config.mode {
|
||||||
OperatingMode::Sample(_) | OperatingMode::FullRun => Some(tokio::task::spawn_blocking(move || write_output(config_, final_write_rx))),
|
OperatingMode::Sample(_) | OperatingMode::FullRun => Some(tokio::task::spawn_blocking(move || write_output(config_, final_write_rx, seqnum))),
|
||||||
_ => None
|
_ => None
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -521,7 +535,7 @@ async fn main() -> Result<()> {
|
|||||||
let path_ = path.clone();
|
let path_ = path.clone();
|
||||||
tracing::info!("reading {:?}", path);
|
tracing::info!("reading {:?}", path);
|
||||||
file_readers.spawn_blocking(move || {
|
file_readers.spawn_blocking(move || {
|
||||||
match process_file(path_, entries_tx, timestamp_threshold.map(|(x, _)| x)) {
|
match process_file(path_, entries_tx, timestamp_threshold.map(|(x, _, _)| x)) {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(e) => tracing::error!("could not parse {:?} {:?}", &path, e)
|
Err(e) => tracing::error!("could not parse {:?} {:?}", &path, e)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user