From c277b49dc16bf4dbbb6ee4c208bdfb0e529d0c71 Mon Sep 17 00:00:00 2001 From: osmarks Date: Thu, 7 Nov 2024 20:43:26 +0000 Subject: [PATCH] fix resumption, oops --- src/dump_processor.rs | 6 ++++-- src/reddit_dump.rs | 40 +++++++++++++++++++++++++++------------- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/src/dump_processor.rs b/src/dump_processor.rs index d74b32a..ea345dc 100644 --- a/src/dump_processor.rs +++ b/src/dump_processor.rs @@ -32,6 +32,7 @@ fn main() -> Result<()> { let stream = zstd::stream::Decoder::new(fs::File::open(path)?)?; let mut stream = BufReader::new(stream); let mut latest_timestamp = 0; + let mut earliest_timestamp = u64::MAX; let mut count = 0; loop { let res: Result = rmp_serde::from_read(&mut stream); @@ -41,14 +42,15 @@ fn main() -> Result<()> { match res { Ok(x) => { if x.timestamp > latest_timestamp { - println!("{} {} https://reddit.com/r/{}/comments/{} {} https://mse.osmarks.net/?e={}", x.timestamp, count, x.subreddit, x.id, x.metadata.final_url, URL_SAFE.encode(x.embedding)); + //println!("{} {} https://reddit.com/r/{}/comments/{} {} https://mse.osmarks.net/?e={}", x.timestamp, count, x.subreddit, x.id, x.metadata.final_url, URL_SAFE.encode(x.embedding)); latest_timestamp = x.timestamp; } + earliest_timestamp = earliest_timestamp.min(x.timestamp); }, Err(DecodeError::InvalidDataRead(x)) | Err(DecodeError::InvalidMarkerRead(x)) if x.kind() == std::io::ErrorKind::UnexpectedEof => break, Err(e) => return Err(e).context("decode fail") } } - println!("{} {}", latest_timestamp, count); + println!("{} {} {}", earliest_timestamp, latest_timestamp, count); Ok(()) } diff --git a/src/reddit_dump.rs b/src/reddit_dump.rs index 70d41f8..1c7cde1 100644 --- a/src/reddit_dump.rs +++ b/src/reddit_dump.rs @@ -1,7 +1,7 @@ use anyhow::{anyhow, Context, Result}; use common::resize_for_embed; use itertools::Itertools; -use std::{collections::HashSet, fs, io::{BufReader, Cursor, BufRead, BufWriter}, time::Duration, sync::Arc, str::FromStr, path::PathBuf}; +use std::{collections::HashSet, ffi::OsStr, fs::{self, read_dir}, io::{BufRead, BufReader, BufWriter, Cursor}, path::PathBuf, str::FromStr, sync::Arc, time::Duration}; use serde::{Serialize, Deserialize}; use lazy_static::lazy_static; use regex::{bytes, Regex, RegexSet, RegexSetBuilder}; @@ -261,8 +261,8 @@ async fn fetch_file(client: reqwest::Client, config: Arc, url: &str) -> Ok((buffer, content_type, response.url().to_string())) } -fn write_output(config: Arc, mut rx: Receiver) -> Result<()> { - let mut out = fs::File::options().create(true).append(true).open(&config.output)?; +fn write_output(config: Arc, mut rx: Receiver, seqnum: usize) -> Result<()> { + let mut out = fs::File::create(PathBuf::from(&config.output).join(format!("{}.dump-zst", seqnum)))?; let stream = zstd::Encoder::new(&mut out, 15)?.auto_finish(); let mut buf_stream = BufWriter::new(stream); while let Some(x) = rx.blocking_recv() { @@ -279,9 +279,22 @@ enum OperatingMode { } #[instrument] -fn readback_output(path: &str) -> Result<(u64, usize)> { +fn readback_output(path: &str) -> Result<(u64, usize, usize)> { use rmp_serde::decode::Error; - let stream = zstd::stream::Decoder::new(fs::File::open(path)?)?; + + let mut highest_seqnum: Option = None; + for entry in read_dir(path)? { + let entry = entry?; + let path = entry.path(); + if path.extension().and_then(OsStr::to_str).map(|x| x == "dump-zst").unwrap_or(false) { + let seqnum = path.file_stem().context("invalid file structure")?.to_str().context("non-UTF8 path")?.parse::().context("invalid file name")?; + highest_seqnum = Some(highest_seqnum.map(|x| x.max(seqnum)).unwrap_or(seqnum)); + } + } + + let seqnum = highest_seqnum.context("no files found")?; + + let stream = zstd::stream::Decoder::new(fs::File::open(PathBuf::from(path).join(&format!("{}.dump-zst", seqnum)))?)?; let mut stream = BufReader::new(stream); let mut latest_timestamp = 0; let mut count = 0; @@ -296,7 +309,7 @@ fn readback_output(path: &str) -> Result<(u64, usize)> { Err(e) => return Err(e).context("decode fail") } } - Ok((latest_timestamp, count)) + Ok((latest_timestamp, count, seqnum)) } async fn serve_metrics(config: Arc) -> Result<()> { @@ -323,10 +336,10 @@ async fn main() -> Result<()> { let config = Arc::new(Config { max_content_length: 1<<24, input: String::from("./reddit_subs_202212/"), - output: String::from("./sample.zst"), + output: String::from("."), backend: String::from("http://localhost:1708"), mode: OperatingMode::FullRun, - filename_threshold: None, + filename_threshold: Some(String::from("RS_2017-08.zst")), metrics_addr: String::from("0.0.0.0:9914"), contact_info: String::from("scraping-ops@osmarks.net") }); @@ -346,9 +359,10 @@ async fn main() -> Result<()> { } }; - - if let Some((threshold, count)) = timestamp_threshold { - tracing::info!("threshold is {}, {} items", threshold, count); + let mut seqnum = 0; + if let Some((threshold, count, existing_seqnum)) = timestamp_threshold { + tracing::info!("threshold is {}, {} items, seq {}", threshold, count, existing_seqnum); + seqnum = existing_seqnum + 1; } let backend = get_backend_config(&config.backend).await; @@ -487,7 +501,7 @@ async fn main() -> Result<()> { let config_ = config.clone(); let output_writer_task = match config.mode { - OperatingMode::Sample(_) | OperatingMode::FullRun => Some(tokio::task::spawn_blocking(move || write_output(config_, final_write_rx))), + OperatingMode::Sample(_) | OperatingMode::FullRun => Some(tokio::task::spawn_blocking(move || write_output(config_, final_write_rx, seqnum))), _ => None }; @@ -521,7 +535,7 @@ async fn main() -> Result<()> { let path_ = path.clone(); tracing::info!("reading {:?}", path); file_readers.spawn_blocking(move || { - match process_file(path_, entries_tx, timestamp_threshold.map(|(x, _)| x)) { + match process_file(path_, entries_tx, timestamp_threshold.map(|(x, _, _)| x)) { Ok(_) => (), Err(e) => tracing::error!("could not parse {:?} {:?}", &path, e) }